1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8 pool: &'a SqlitePool,
9 notifier: crate::notifications::NotificationSender,
10 cli_notifier: Option<crate::dashboard::cli_notifier::CliNotifier>,
11 project_path: Option<String>,
12}
13
14impl<'a> EventManager<'a> {
15 pub fn new(pool: &'a SqlitePool) -> Self {
16 Self {
17 pool,
18 notifier: crate::notifications::NotificationSender::new(None),
19 cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
20 project_path: None,
21 }
22 }
23
24 pub fn with_project_path(pool: &'a SqlitePool, project_path: String) -> Self {
26 Self {
27 pool,
28 notifier: crate::notifications::NotificationSender::new(None),
29 cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
30 project_path: Some(project_path),
31 }
32 }
33
34 pub fn with_websocket(
36 pool: &'a SqlitePool,
37 ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
38 project_path: String,
39 ) -> Self {
40 Self {
41 pool,
42 notifier: crate::notifications::NotificationSender::new(Some(ws_state)),
43 cli_notifier: None, project_path: Some(project_path),
45 }
46 }
47
48 async fn notify_event_created(&self, event: &Event) {
50 use crate::dashboard::websocket::DatabaseOperationPayload;
51
52 if let Some(project_path) = &self.project_path {
54 let event_json = match serde_json::to_value(event) {
55 Ok(json) => json,
56 Err(e) => {
57 tracing::warn!(error = %e, "Failed to serialize event for notification");
58 return;
59 },
60 };
61
62 let payload =
63 DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
64 self.notifier.send(payload).await;
65 }
66
67 if let Some(cli_notifier) = &self.cli_notifier {
69 cli_notifier
70 .notify_event_added(event.task_id, event.id, self.project_path.clone())
71 .await;
72 }
73 }
74
75 async fn notify_event_updated(&self, event: &Event) {
77 use crate::dashboard::websocket::DatabaseOperationPayload;
78
79 let Some(project_path) = &self.project_path else {
80 return;
81 };
82
83 let event_json = match serde_json::to_value(event) {
84 Ok(json) => json,
85 Err(e) => {
86 tracing::warn!("Failed to serialize event for notification: {}", e);
87 return;
88 },
89 };
90
91 let payload =
92 DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
93 self.notifier.send(payload).await;
94 }
95
96 async fn notify_event_deleted(&self, event_id: i64) {
98 use crate::dashboard::websocket::DatabaseOperationPayload;
99
100 let Some(project_path) = &self.project_path else {
101 return;
102 };
103
104 let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
105 self.notifier.send(payload).await;
106 }
107
108 pub async fn add_event(
110 &self,
111 task_id: i64,
112 log_type: String,
113 discussion_data: String,
114 ) -> Result<Event> {
115 let task_exists: bool =
117 sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
118 .bind(task_id)
119 .fetch_one(self.pool)
120 .await?;
121
122 if !task_exists {
123 return Err(IntentError::TaskNotFound(task_id));
124 }
125
126 let now = Utc::now();
127
128 let result = sqlx::query(
129 r#"
130 INSERT INTO events (task_id, log_type, discussion_data, timestamp)
131 VALUES (?, ?, ?, ?)
132 "#,
133 )
134 .bind(task_id)
135 .bind(&log_type)
136 .bind(&discussion_data)
137 .bind(now)
138 .execute(self.pool)
139 .await?;
140
141 let id = result.last_insert_rowid();
142
143 let event = Event {
144 id,
145 task_id,
146 timestamp: now,
147 log_type,
148 discussion_data,
149 };
150
151 self.notify_event_created(&event).await;
153
154 Ok(event)
155 }
156
157 pub async fn update_event(
159 &self,
160 event_id: i64,
161 log_type: Option<&str>,
162 discussion_data: Option<&str>,
163 ) -> Result<Event> {
164 let existing_event: Option<Event> =
166 sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
167 .bind(event_id)
168 .fetch_optional(self.pool)
169 .await?;
170
171 let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
172 "Event {} not found",
173 event_id
174 )))?;
175
176 let new_log_type = log_type.unwrap_or(&existing_event.log_type);
178 let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
179
180 sqlx::query(
181 r#"
182 UPDATE events
183 SET log_type = ?, discussion_data = ?
184 WHERE id = ?
185 "#,
186 )
187 .bind(new_log_type)
188 .bind(new_discussion_data)
189 .bind(event_id)
190 .execute(self.pool)
191 .await?;
192
193 let updated_event = Event {
194 id: existing_event.id,
195 task_id: existing_event.task_id,
196 timestamp: existing_event.timestamp,
197 log_type: new_log_type.to_string(),
198 discussion_data: new_discussion_data.to_string(),
199 };
200
201 self.notify_event_updated(&updated_event).await;
203
204 Ok(updated_event)
205 }
206
207 pub async fn delete_event(&self, event_id: i64) -> Result<()> {
209 let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
211 .bind(event_id)
212 .fetch_optional(self.pool)
213 .await?;
214
215 let _event = event.ok_or(IntentError::InvalidInput(format!(
216 "Event {} not found",
217 event_id
218 )))?;
219
220 let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
222 .bind(event_id)
223 .execute(self.pool)
224 .await;
225
226 sqlx::query("DELETE FROM events WHERE id = ?")
228 .bind(event_id)
229 .execute(self.pool)
230 .await?;
231
232 self.notify_event_deleted(event_id).await;
234
235 Ok(())
236 }
237
238 pub async fn list_events(
240 &self,
241 task_id: Option<i64>,
242 limit: Option<i64>,
243 log_type: Option<String>,
244 since: Option<String>,
245 ) -> Result<Vec<Event>> {
246 if let Some(tid) = task_id {
248 let task_exists: bool =
249 sqlx::query_scalar::<_, bool>(crate::sql_constants::CHECK_TASK_EXISTS)
250 .bind(tid)
251 .fetch_one(self.pool)
252 .await?;
253
254 if !task_exists {
255 return Err(IntentError::TaskNotFound(tid));
256 }
257 }
258
259 let limit = limit.unwrap_or(50);
260
261 let since_timestamp = if let Some(duration_str) = since {
263 Some(crate::time_utils::parse_duration(&duration_str)?)
264 } else {
265 None
266 };
267
268 let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
270 let mut conditions = Vec::new();
271
272 if task_id.is_some() {
273 conditions.push("task_id = ?");
274 }
275
276 if log_type.is_some() {
277 conditions.push("log_type = ?");
278 }
279
280 if since_timestamp.is_some() {
281 conditions.push("timestamp >= ?");
282 }
283
284 if !conditions.is_empty() {
285 query.push_str(" AND ");
286 query.push_str(&conditions.join(" AND "));
287 }
288
289 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
290
291 let mut sql_query = sqlx::query_as::<_, Event>(&query);
293
294 if let Some(tid) = task_id {
295 sql_query = sql_query.bind(tid);
296 }
297
298 if let Some(ref typ) = log_type {
299 sql_query = sql_query.bind(typ);
300 }
301
302 if let Some(ts) = since_timestamp {
303 sql_query = sql_query.bind(ts);
304 }
305
306 sql_query = sql_query.bind(limit);
307
308 let events = sql_query.fetch_all(self.pool).await?;
309
310 Ok(events)
311 }
312
313 pub async fn search_events_fts5(
315 &self,
316 query: &str,
317 limit: Option<i64>,
318 ) -> Result<Vec<EventSearchResult>> {
319 let limit = limit.unwrap_or(20);
320
321 let results = sqlx::query(
323 r#"
324 SELECT
325 e.id,
326 e.task_id,
327 e.timestamp,
328 e.log_type,
329 e.discussion_data,
330 snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
331 FROM events_fts
332 INNER JOIN events e ON events_fts.rowid = e.id
333 WHERE events_fts MATCH ?
334 ORDER BY rank
335 LIMIT ?
336 "#,
337 )
338 .bind(query)
339 .bind(limit)
340 .fetch_all(self.pool)
341 .await?;
342
343 let mut search_results = Vec::new();
344 for row in results {
345 let event = Event {
346 id: row.get("id"),
347 task_id: row.get("task_id"),
348 timestamp: row.get("timestamp"),
349 log_type: row.get("log_type"),
350 discussion_data: row.get("discussion_data"),
351 };
352 let match_snippet: String = row.get("match_snippet");
353
354 search_results.push(EventSearchResult {
355 event,
356 match_snippet,
357 });
358 }
359
360 Ok(search_results)
361 }
362}
363
364#[derive(Debug)]
366pub struct EventSearchResult {
367 pub event: Event,
368 pub match_snippet: String,
369}
370
371impl crate::backend::EventBackend for EventManager<'_> {
372 fn add_event(
373 &self,
374 task_id: i64,
375 log_type: String,
376 discussion_data: String,
377 ) -> impl std::future::Future<Output = Result<Event>> + Send {
378 self.add_event(task_id, log_type, discussion_data)
379 }
380
381 fn list_events(
382 &self,
383 task_id: Option<i64>,
384 limit: Option<i64>,
385 log_type: Option<String>,
386 since: Option<String>,
387 ) -> impl std::future::Future<Output = Result<Vec<Event>>> + Send {
388 self.list_events(task_id, limit, log_type, since)
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::tasks::TaskManager;
396 use crate::test_utils::test_helpers::TestContext;
397
398 #[tokio::test]
399 async fn test_add_event() {
400 let ctx = TestContext::new().await;
401 let task_mgr = TaskManager::new(ctx.pool());
402 let event_mgr = EventManager::new(ctx.pool());
403
404 let task = task_mgr
405 .add_task("Test task".to_string(), None, None, None, None, None)
406 .await
407 .unwrap();
408 let event = event_mgr
409 .add_event(task.id, "decision".to_string(), "Test decision".to_string())
410 .await
411 .unwrap();
412
413 assert_eq!(event.task_id, task.id);
414 assert_eq!(event.log_type, "decision");
415 assert_eq!(event.discussion_data, "Test decision");
416 }
417
418 #[tokio::test]
419 async fn test_add_event_nonexistent_task() {
420 let ctx = TestContext::new().await;
421 let event_mgr = EventManager::new(ctx.pool());
422
423 let result = event_mgr
424 .add_event(999, "decision".to_string(), "Test".to_string())
425 .await;
426 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
427 }
428
429 #[tokio::test]
430 async fn test_list_events() {
431 let ctx = TestContext::new().await;
432 let task_mgr = TaskManager::new(ctx.pool());
433 let event_mgr = EventManager::new(ctx.pool());
434
435 let task = task_mgr
436 .add_task("Test task".to_string(), None, None, None, None, None)
437 .await
438 .unwrap();
439
440 event_mgr
442 .add_event(task.id, "decision".to_string(), "Decision 1".to_string())
443 .await
444 .unwrap();
445 event_mgr
446 .add_event(task.id, "blocker".to_string(), "Blocker 1".to_string())
447 .await
448 .unwrap();
449 event_mgr
450 .add_event(task.id, "milestone".to_string(), "Milestone 1".to_string())
451 .await
452 .unwrap();
453
454 let events = event_mgr
455 .list_events(Some(task.id), None, None, None)
456 .await
457 .unwrap();
458 assert_eq!(events.len(), 3);
459
460 assert_eq!(events[0].log_type, "milestone");
462 assert_eq!(events[1].log_type, "blocker");
463 assert_eq!(events[2].log_type, "decision");
464 }
465
466 #[tokio::test]
467 async fn test_list_events_with_limit() {
468 let ctx = TestContext::new().await;
469 let task_mgr = TaskManager::new(ctx.pool());
470 let event_mgr = EventManager::new(ctx.pool());
471
472 let task = task_mgr
473 .add_task("Test task".to_string(), None, None, None, None, None)
474 .await
475 .unwrap();
476
477 for i in 0..5 {
479 event_mgr
480 .add_event(task.id, "test".to_string(), format!("Event {}", i))
481 .await
482 .unwrap();
483 }
484
485 let events = event_mgr
486 .list_events(Some(task.id), Some(3), None, None)
487 .await
488 .unwrap();
489 assert_eq!(events.len(), 3);
490 }
491
492 #[tokio::test]
493 async fn test_list_events_nonexistent_task() {
494 let ctx = TestContext::new().await;
495 let event_mgr = EventManager::new(ctx.pool());
496
497 let result = event_mgr.list_events(Some(999), None, None, None).await;
498 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
499 }
500
501 #[tokio::test]
502 async fn test_list_events_empty() {
503 let ctx = TestContext::new().await;
504 let task_mgr = TaskManager::new(ctx.pool());
505 let event_mgr = EventManager::new(ctx.pool());
506
507 let task = task_mgr
508 .add_task("Test task".to_string(), None, None, None, None, None)
509 .await
510 .unwrap();
511
512 let events = event_mgr
513 .list_events(Some(task.id), None, None, None)
514 .await
515 .unwrap();
516 assert_eq!(events.len(), 0);
517 }
518
519 #[tokio::test]
520 async fn test_update_event() {
521 let ctx = TestContext::new().await;
522 let task_mgr = TaskManager::new(ctx.pool());
523 let event_mgr = EventManager::new(ctx.pool());
524
525 let task = task_mgr
526 .add_task("Test task".to_string(), None, None, None, None, None)
527 .await
528 .unwrap();
529 let event = event_mgr
530 .add_event(
531 task.id,
532 "decision".to_string(),
533 "Initial decision".to_string(),
534 )
535 .await
536 .unwrap();
537
538 let updated = event_mgr
540 .update_event(event.id, Some("milestone"), Some("Updated decision"))
541 .await
542 .unwrap();
543
544 assert_eq!(updated.id, event.id);
545 assert_eq!(updated.task_id, task.id);
546 assert_eq!(updated.log_type, "milestone");
547 assert_eq!(updated.discussion_data, "Updated decision");
548 }
549
550 #[tokio::test]
551 async fn test_update_event_partial() {
552 let ctx = TestContext::new().await;
553 let task_mgr = TaskManager::new(ctx.pool());
554 let event_mgr = EventManager::new(ctx.pool());
555
556 let task = task_mgr
557 .add_task("Test task".to_string(), None, None, None, None, None)
558 .await
559 .unwrap();
560 let event = event_mgr
561 .add_event(
562 task.id,
563 "decision".to_string(),
564 "Initial decision".to_string(),
565 )
566 .await
567 .unwrap();
568
569 let updated = event_mgr
571 .update_event(event.id, None, Some("Updated data only"))
572 .await
573 .unwrap();
574
575 assert_eq!(updated.log_type, "decision"); assert_eq!(updated.discussion_data, "Updated data only");
577 }
578
579 #[tokio::test]
580 async fn test_update_event_nonexistent() {
581 let ctx = TestContext::new().await;
582 let event_mgr = EventManager::new(ctx.pool());
583
584 let result = event_mgr
585 .update_event(999, Some("decision"), Some("Test"))
586 .await;
587
588 assert!(result.is_err());
589 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
590 }
591
592 #[tokio::test]
593 async fn test_delete_event() {
594 let ctx = TestContext::new().await;
595 let task_mgr = TaskManager::new(ctx.pool());
596 let event_mgr = EventManager::new(ctx.pool());
597
598 let task = task_mgr
599 .add_task("Test task".to_string(), None, None, None, None, None)
600 .await
601 .unwrap();
602 let event = event_mgr
603 .add_event(task.id, "decision".to_string(), "To be deleted".to_string())
604 .await
605 .unwrap();
606
607 event_mgr.delete_event(event.id).await.unwrap();
609
610 let events = event_mgr
612 .list_events(Some(task.id), None, None, None)
613 .await
614 .unwrap();
615 assert_eq!(events.len(), 0);
616 }
617
618 #[tokio::test]
619 async fn test_delete_event_nonexistent() {
620 let ctx = TestContext::new().await;
621 let event_mgr = EventManager::new(ctx.pool());
622
623 let result = event_mgr.delete_event(999).await;
624 assert!(result.is_err());
625 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
626 }
627
628 #[tokio::test]
629 async fn test_list_events_filter_by_type() {
630 let ctx = TestContext::new().await;
631 let task_mgr = TaskManager::new(ctx.pool());
632 let event_mgr = EventManager::new(ctx.pool());
633
634 let task = task_mgr
635 .add_task("Test task".to_string(), None, None, None, None, None)
636 .await
637 .unwrap();
638
639 event_mgr
641 .add_event(task.id, "decision".to_string(), "Decision 1".to_string())
642 .await
643 .unwrap();
644 event_mgr
645 .add_event(task.id, "blocker".to_string(), "Blocker 1".to_string())
646 .await
647 .unwrap();
648 event_mgr
649 .add_event(task.id, "decision".to_string(), "Decision 2".to_string())
650 .await
651 .unwrap();
652
653 let events = event_mgr
655 .list_events(Some(task.id), None, Some("decision".to_string()), None)
656 .await
657 .unwrap();
658
659 assert_eq!(events.len(), 2);
660 assert!(events.iter().all(|e| e.log_type == "decision"));
661 }
662
663 #[tokio::test]
664 async fn test_list_events_global() {
665 let ctx = TestContext::new().await;
666 let task_mgr = TaskManager::new(ctx.pool());
667 let event_mgr = EventManager::new(ctx.pool());
668
669 let task1 = task_mgr
670 .add_task("Task 1".to_string(), None, None, None, None, None)
671 .await
672 .unwrap();
673 let task2 = task_mgr
674 .add_task("Task 2".to_string(), None, None, None, None, None)
675 .await
676 .unwrap();
677
678 event_mgr
680 .add_event(
681 task1.id,
682 "decision".to_string(),
683 "Task 1 Decision".to_string(),
684 )
685 .await
686 .unwrap();
687 event_mgr
688 .add_event(
689 task2.id,
690 "decision".to_string(),
691 "Task 2 Decision".to_string(),
692 )
693 .await
694 .unwrap();
695
696 let events = event_mgr.list_events(None, None, None, None).await.unwrap();
698
699 assert!(events.len() >= 2); let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
701 let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
702
703 assert_eq!(task1_events.len(), 1);
704 assert_eq!(task2_events.len(), 1);
705 }
706}