1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8 pool: &'a SqlitePool,
9 notifier: crate::notifications::NotificationSender,
10 cli_notifier: Option<crate::dashboard::cli_notifier::CliNotifier>,
11 project_path: Option<String>,
12}
13
14impl<'a> EventManager<'a> {
15 pub fn new(pool: &'a SqlitePool) -> Self {
16 Self {
17 pool,
18 notifier: crate::notifications::NotificationSender::new(None),
19 cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
20 project_path: None,
21 }
22 }
23
24 pub fn with_project_path(pool: &'a SqlitePool, project_path: String) -> Self {
26 Self {
27 pool,
28 notifier: crate::notifications::NotificationSender::new(None),
29 cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
30 project_path: Some(project_path),
31 }
32 }
33
34 pub fn with_websocket(
36 pool: &'a SqlitePool,
37 ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
38 project_path: String,
39 ) -> Self {
40 Self {
41 pool,
42 notifier: crate::notifications::NotificationSender::new(Some(ws_state)),
43 cli_notifier: None, project_path: Some(project_path),
45 }
46 }
47
48 async fn notify_event_created(&self, event: &Event) {
50 use crate::dashboard::websocket::DatabaseOperationPayload;
51
52 if let Some(project_path) = &self.project_path {
54 let event_json = match serde_json::to_value(event) {
55 Ok(json) => json,
56 Err(e) => {
57 tracing::warn!(error = %e, "Failed to serialize event for notification");
58 return;
59 },
60 };
61
62 let payload =
63 DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
64 self.notifier.send(payload).await;
65 }
66
67 if let Some(cli_notifier) = &self.cli_notifier {
69 cli_notifier
70 .notify_event_added(event.task_id, event.id, self.project_path.clone())
71 .await;
72 }
73 }
74
75 async fn notify_event_updated(&self, event: &Event) {
77 use crate::dashboard::websocket::DatabaseOperationPayload;
78
79 let Some(project_path) = &self.project_path else {
80 return;
81 };
82
83 let event_json = match serde_json::to_value(event) {
84 Ok(json) => json,
85 Err(e) => {
86 tracing::warn!("Failed to serialize event for notification: {}", e);
87 return;
88 },
89 };
90
91 let payload =
92 DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
93 self.notifier.send(payload).await;
94 }
95
96 async fn notify_event_deleted(&self, event_id: i64) {
98 use crate::dashboard::websocket::DatabaseOperationPayload;
99
100 let Some(project_path) = &self.project_path else {
101 return;
102 };
103
104 let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
105 self.notifier.send(payload).await;
106 }
107
108 pub async fn add_event(
110 &self,
111 task_id: i64,
112 log_type: &str,
113 discussion_data: &str,
114 ) -> Result<Event> {
115 let task_exists: bool =
117 sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
118 .bind(task_id)
119 .fetch_one(self.pool)
120 .await?;
121
122 if !task_exists {
123 return Err(IntentError::TaskNotFound(task_id));
124 }
125
126 let now = Utc::now();
127
128 let result = sqlx::query(
129 r#"
130 INSERT INTO events (task_id, log_type, discussion_data, timestamp)
131 VALUES (?, ?, ?, ?)
132 "#,
133 )
134 .bind(task_id)
135 .bind(log_type)
136 .bind(discussion_data)
137 .bind(now)
138 .execute(self.pool)
139 .await?;
140
141 let id = result.last_insert_rowid();
142
143 let event = Event {
144 id,
145 task_id,
146 timestamp: now,
147 log_type: log_type.to_string(),
148 discussion_data: discussion_data.to_string(),
149 };
150
151 self.notify_event_created(&event).await;
153
154 Ok(event)
155 }
156
157 pub async fn update_event(
159 &self,
160 event_id: i64,
161 log_type: Option<&str>,
162 discussion_data: Option<&str>,
163 ) -> Result<Event> {
164 let existing_event: Option<Event> =
166 sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
167 .bind(event_id)
168 .fetch_optional(self.pool)
169 .await?;
170
171 let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
172 "Event {} not found",
173 event_id
174 )))?;
175
176 let new_log_type = log_type.unwrap_or(&existing_event.log_type);
178 let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
179
180 sqlx::query(
181 r#"
182 UPDATE events
183 SET log_type = ?, discussion_data = ?
184 WHERE id = ?
185 "#,
186 )
187 .bind(new_log_type)
188 .bind(new_discussion_data)
189 .bind(event_id)
190 .execute(self.pool)
191 .await?;
192
193 let updated_event = Event {
194 id: existing_event.id,
195 task_id: existing_event.task_id,
196 timestamp: existing_event.timestamp,
197 log_type: new_log_type.to_string(),
198 discussion_data: new_discussion_data.to_string(),
199 };
200
201 self.notify_event_updated(&updated_event).await;
203
204 Ok(updated_event)
205 }
206
207 pub async fn delete_event(&self, event_id: i64) -> Result<()> {
209 let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
211 .bind(event_id)
212 .fetch_optional(self.pool)
213 .await?;
214
215 let _event = event.ok_or(IntentError::InvalidInput(format!(
216 "Event {} not found",
217 event_id
218 )))?;
219
220 let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
222 .bind(event_id)
223 .execute(self.pool)
224 .await;
225
226 sqlx::query("DELETE FROM events WHERE id = ?")
228 .bind(event_id)
229 .execute(self.pool)
230 .await?;
231
232 self.notify_event_deleted(event_id).await;
234
235 Ok(())
236 }
237
238 pub async fn list_events(
240 &self,
241 task_id: Option<i64>,
242 limit: Option<i64>,
243 log_type: Option<String>,
244 since: Option<String>,
245 ) -> Result<Vec<Event>> {
246 if let Some(tid) = task_id {
248 let task_exists: bool =
249 sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
250 .bind(tid)
251 .fetch_one(self.pool)
252 .await?;
253
254 if !task_exists {
255 return Err(IntentError::TaskNotFound(tid));
256 }
257 }
258
259 let limit = limit.unwrap_or(50);
260
261 let since_timestamp = if let Some(duration_str) = since {
263 Some(crate::time_utils::parse_duration(&duration_str)?)
264 } else {
265 None
266 };
267
268 let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
270 let mut conditions = Vec::new();
271
272 if task_id.is_some() {
273 conditions.push("task_id = ?");
274 }
275
276 if log_type.is_some() {
277 conditions.push("log_type = ?");
278 }
279
280 if since_timestamp.is_some() {
281 conditions.push("timestamp >= ?");
282 }
283
284 if !conditions.is_empty() {
285 query.push_str(" AND ");
286 query.push_str(&conditions.join(" AND "));
287 }
288
289 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
290
291 let mut sql_query = sqlx::query_as::<_, Event>(&query);
293
294 if let Some(tid) = task_id {
295 sql_query = sql_query.bind(tid);
296 }
297
298 if let Some(ref typ) = log_type {
299 sql_query = sql_query.bind(typ);
300 }
301
302 if let Some(ts) = since_timestamp {
303 sql_query = sql_query.bind(ts);
304 }
305
306 sql_query = sql_query.bind(limit);
307
308 let events = sql_query.fetch_all(self.pool).await?;
309
310 Ok(events)
311 }
312
313 pub async fn search_events_fts5(
315 &self,
316 query: &str,
317 limit: Option<i64>,
318 ) -> Result<Vec<EventSearchResult>> {
319 let limit = limit.unwrap_or(20);
320
321 let results = sqlx::query(
323 r#"
324 SELECT
325 e.id,
326 e.task_id,
327 e.timestamp,
328 e.log_type,
329 e.discussion_data,
330 snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
331 FROM events_fts
332 INNER JOIN events e ON events_fts.rowid = e.id
333 WHERE events_fts MATCH ?
334 ORDER BY rank
335 LIMIT ?
336 "#,
337 )
338 .bind(query)
339 .bind(limit)
340 .fetch_all(self.pool)
341 .await?;
342
343 let mut search_results = Vec::new();
344 for row in results {
345 let event = Event {
346 id: row.get("id"),
347 task_id: row.get("task_id"),
348 timestamp: row.get("timestamp"),
349 log_type: row.get("log_type"),
350 discussion_data: row.get("discussion_data"),
351 };
352 let match_snippet: String = row.get("match_snippet");
353
354 search_results.push(EventSearchResult {
355 event,
356 match_snippet,
357 });
358 }
359
360 Ok(search_results)
361 }
362}
363
364#[derive(Debug)]
366pub struct EventSearchResult {
367 pub event: Event,
368 pub match_snippet: String,
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::tasks::TaskManager;
375 use crate::test_utils::test_helpers::TestContext;
376
377 #[tokio::test]
378 async fn test_add_event() {
379 let ctx = TestContext::new().await;
380 let task_mgr = TaskManager::new(ctx.pool());
381 let event_mgr = EventManager::new(ctx.pool());
382
383 let task = task_mgr
384 .add_task("Test task", None, None, None)
385 .await
386 .unwrap();
387 let event = event_mgr
388 .add_event(task.id, "decision", "Test decision")
389 .await
390 .unwrap();
391
392 assert_eq!(event.task_id, task.id);
393 assert_eq!(event.log_type, "decision");
394 assert_eq!(event.discussion_data, "Test decision");
395 }
396
397 #[tokio::test]
398 async fn test_add_event_nonexistent_task() {
399 let ctx = TestContext::new().await;
400 let event_mgr = EventManager::new(ctx.pool());
401
402 let result = event_mgr.add_event(999, "decision", "Test").await;
403 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
404 }
405
406 #[tokio::test]
407 async fn test_list_events() {
408 let ctx = TestContext::new().await;
409 let task_mgr = TaskManager::new(ctx.pool());
410 let event_mgr = EventManager::new(ctx.pool());
411
412 let task = task_mgr
413 .add_task("Test task", None, None, None)
414 .await
415 .unwrap();
416
417 event_mgr
419 .add_event(task.id, "decision", "Decision 1")
420 .await
421 .unwrap();
422 event_mgr
423 .add_event(task.id, "blocker", "Blocker 1")
424 .await
425 .unwrap();
426 event_mgr
427 .add_event(task.id, "milestone", "Milestone 1")
428 .await
429 .unwrap();
430
431 let events = event_mgr
432 .list_events(Some(task.id), None, None, None)
433 .await
434 .unwrap();
435 assert_eq!(events.len(), 3);
436
437 assert_eq!(events[0].log_type, "milestone");
439 assert_eq!(events[1].log_type, "blocker");
440 assert_eq!(events[2].log_type, "decision");
441 }
442
443 #[tokio::test]
444 async fn test_list_events_with_limit() {
445 let ctx = TestContext::new().await;
446 let task_mgr = TaskManager::new(ctx.pool());
447 let event_mgr = EventManager::new(ctx.pool());
448
449 let task = task_mgr
450 .add_task("Test task", None, None, None)
451 .await
452 .unwrap();
453
454 for i in 0..5 {
456 event_mgr
457 .add_event(task.id, "test", &format!("Event {}", i))
458 .await
459 .unwrap();
460 }
461
462 let events = event_mgr
463 .list_events(Some(task.id), Some(3), None, None)
464 .await
465 .unwrap();
466 assert_eq!(events.len(), 3);
467 }
468
469 #[tokio::test]
470 async fn test_list_events_nonexistent_task() {
471 let ctx = TestContext::new().await;
472 let event_mgr = EventManager::new(ctx.pool());
473
474 let result = event_mgr.list_events(Some(999), None, None, None).await;
475 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
476 }
477
478 #[tokio::test]
479 async fn test_list_events_empty() {
480 let ctx = TestContext::new().await;
481 let task_mgr = TaskManager::new(ctx.pool());
482 let event_mgr = EventManager::new(ctx.pool());
483
484 let task = task_mgr
485 .add_task("Test task", None, None, None)
486 .await
487 .unwrap();
488
489 let events = event_mgr
490 .list_events(Some(task.id), None, None, None)
491 .await
492 .unwrap();
493 assert_eq!(events.len(), 0);
494 }
495
496 #[tokio::test]
497 async fn test_update_event() {
498 let ctx = TestContext::new().await;
499 let task_mgr = TaskManager::new(ctx.pool());
500 let event_mgr = EventManager::new(ctx.pool());
501
502 let task = task_mgr
503 .add_task("Test task", None, None, None)
504 .await
505 .unwrap();
506 let event = event_mgr
507 .add_event(task.id, "decision", "Initial decision")
508 .await
509 .unwrap();
510
511 let updated = event_mgr
513 .update_event(event.id, Some("milestone"), Some("Updated decision"))
514 .await
515 .unwrap();
516
517 assert_eq!(updated.id, event.id);
518 assert_eq!(updated.task_id, task.id);
519 assert_eq!(updated.log_type, "milestone");
520 assert_eq!(updated.discussion_data, "Updated decision");
521 }
522
523 #[tokio::test]
524 async fn test_update_event_partial() {
525 let ctx = TestContext::new().await;
526 let task_mgr = TaskManager::new(ctx.pool());
527 let event_mgr = EventManager::new(ctx.pool());
528
529 let task = task_mgr
530 .add_task("Test task", None, None, None)
531 .await
532 .unwrap();
533 let event = event_mgr
534 .add_event(task.id, "decision", "Initial decision")
535 .await
536 .unwrap();
537
538 let updated = event_mgr
540 .update_event(event.id, None, Some("Updated data only"))
541 .await
542 .unwrap();
543
544 assert_eq!(updated.log_type, "decision"); assert_eq!(updated.discussion_data, "Updated data only");
546 }
547
548 #[tokio::test]
549 async fn test_update_event_nonexistent() {
550 let ctx = TestContext::new().await;
551 let event_mgr = EventManager::new(ctx.pool());
552
553 let result = event_mgr
554 .update_event(999, Some("decision"), Some("Test"))
555 .await;
556
557 assert!(result.is_err());
558 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
559 }
560
561 #[tokio::test]
562 async fn test_delete_event() {
563 let ctx = TestContext::new().await;
564 let task_mgr = TaskManager::new(ctx.pool());
565 let event_mgr = EventManager::new(ctx.pool());
566
567 let task = task_mgr
568 .add_task("Test task", None, None, None)
569 .await
570 .unwrap();
571 let event = event_mgr
572 .add_event(task.id, "decision", "To be deleted")
573 .await
574 .unwrap();
575
576 event_mgr.delete_event(event.id).await.unwrap();
578
579 let events = event_mgr
581 .list_events(Some(task.id), None, None, None)
582 .await
583 .unwrap();
584 assert_eq!(events.len(), 0);
585 }
586
587 #[tokio::test]
588 async fn test_delete_event_nonexistent() {
589 let ctx = TestContext::new().await;
590 let event_mgr = EventManager::new(ctx.pool());
591
592 let result = event_mgr.delete_event(999).await;
593 assert!(result.is_err());
594 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
595 }
596
597 #[tokio::test]
598 async fn test_list_events_filter_by_type() {
599 let ctx = TestContext::new().await;
600 let task_mgr = TaskManager::new(ctx.pool());
601 let event_mgr = EventManager::new(ctx.pool());
602
603 let task = task_mgr
604 .add_task("Test task", None, None, None)
605 .await
606 .unwrap();
607
608 event_mgr
610 .add_event(task.id, "decision", "Decision 1")
611 .await
612 .unwrap();
613 event_mgr
614 .add_event(task.id, "blocker", "Blocker 1")
615 .await
616 .unwrap();
617 event_mgr
618 .add_event(task.id, "decision", "Decision 2")
619 .await
620 .unwrap();
621
622 let events = event_mgr
624 .list_events(Some(task.id), None, Some("decision".to_string()), None)
625 .await
626 .unwrap();
627
628 assert_eq!(events.len(), 2);
629 assert!(events.iter().all(|e| e.log_type == "decision"));
630 }
631
632 #[tokio::test]
633 async fn test_list_events_global() {
634 let ctx = TestContext::new().await;
635 let task_mgr = TaskManager::new(ctx.pool());
636 let event_mgr = EventManager::new(ctx.pool());
637
638 let task1 = task_mgr.add_task("Task 1", None, None, None).await.unwrap();
639 let task2 = task_mgr.add_task("Task 2", None, None, None).await.unwrap();
640
641 event_mgr
643 .add_event(task1.id, "decision", "Task 1 Decision")
644 .await
645 .unwrap();
646 event_mgr
647 .add_event(task2.id, "decision", "Task 2 Decision")
648 .await
649 .unwrap();
650
651 let events = event_mgr.list_events(None, None, None, None).await.unwrap();
653
654 assert!(events.len() >= 2); let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
656 let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
657
658 assert_eq!(task1_events.len(), 1);
659 assert_eq!(task2_events.len(), 1);
660 }
661}