1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8 pool: &'a SqlitePool,
9 notifier: crate::notifications::NotificationSender,
10 cli_notifier: Option<crate::dashboard::cli_notifier::CliNotifier>,
11 project_path: Option<String>,
12}
13
14impl<'a> EventManager<'a> {
15 pub fn new(pool: &'a SqlitePool) -> Self {
16 Self {
17 pool,
18 notifier: crate::notifications::NotificationSender::new(None),
19 cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
20 project_path: None,
21 }
22 }
23
24 pub fn with_project_path(pool: &'a SqlitePool, project_path: String) -> Self {
26 Self {
27 pool,
28 notifier: crate::notifications::NotificationSender::new(None),
29 cli_notifier: Some(crate::dashboard::cli_notifier::CliNotifier::new()),
30 project_path: Some(project_path),
31 }
32 }
33
34 pub fn with_websocket(
36 pool: &'a SqlitePool,
37 ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
38 project_path: String,
39 ) -> Self {
40 Self {
41 pool,
42 notifier: crate::notifications::NotificationSender::new(Some(ws_state)),
43 cli_notifier: None, project_path: Some(project_path),
45 }
46 }
47
48 async fn notify_event_created(&self, event: &Event) {
50 use crate::dashboard::websocket::DatabaseOperationPayload;
51
52 if let Some(project_path) = &self.project_path {
54 let event_json = match serde_json::to_value(event) {
55 Ok(json) => json,
56 Err(e) => {
57 tracing::warn!("Failed to serialize event for notification: {}", e);
58 return;
59 },
60 };
61
62 let payload =
63 DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
64 self.notifier.send(payload).await;
65 }
66
67 if let Some(cli_notifier) = &self.cli_notifier {
69 cli_notifier
70 .notify_event_added(event.task_id, event.id, self.project_path.clone())
71 .await;
72 }
73 }
74
75 async fn notify_event_updated(&self, event: &Event) {
77 use crate::dashboard::websocket::DatabaseOperationPayload;
78
79 let Some(project_path) = &self.project_path else {
80 return;
81 };
82
83 let event_json = match serde_json::to_value(event) {
84 Ok(json) => json,
85 Err(e) => {
86 tracing::warn!("Failed to serialize event for notification: {}", e);
87 return;
88 },
89 };
90
91 let payload =
92 DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
93 self.notifier.send(payload).await;
94 }
95
96 async fn notify_event_deleted(&self, event_id: i64) {
98 use crate::dashboard::websocket::DatabaseOperationPayload;
99
100 let Some(project_path) = &self.project_path else {
101 return;
102 };
103
104 let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
105 self.notifier.send(payload).await;
106 }
107
108 pub async fn add_event(
110 &self,
111 task_id: i64,
112 log_type: &str,
113 discussion_data: &str,
114 ) -> Result<Event> {
115 let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
117 .bind(task_id)
118 .fetch_one(self.pool)
119 .await?;
120
121 if !task_exists {
122 return Err(IntentError::TaskNotFound(task_id));
123 }
124
125 let now = Utc::now();
126
127 let result = sqlx::query(
128 r#"
129 INSERT INTO events (task_id, log_type, discussion_data, timestamp)
130 VALUES (?, ?, ?, ?)
131 "#,
132 )
133 .bind(task_id)
134 .bind(log_type)
135 .bind(discussion_data)
136 .bind(now)
137 .execute(self.pool)
138 .await?;
139
140 let id = result.last_insert_rowid();
141
142 let event = Event {
143 id,
144 task_id,
145 timestamp: now,
146 log_type: log_type.to_string(),
147 discussion_data: discussion_data.to_string(),
148 };
149
150 self.notify_event_created(&event).await;
152
153 Ok(event)
154 }
155
156 pub async fn update_event(
158 &self,
159 event_id: i64,
160 log_type: Option<&str>,
161 discussion_data: Option<&str>,
162 ) -> Result<Event> {
163 let existing_event: Option<Event> =
165 sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
166 .bind(event_id)
167 .fetch_optional(self.pool)
168 .await?;
169
170 let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
171 "Event {} not found",
172 event_id
173 )))?;
174
175 let new_log_type = log_type.unwrap_or(&existing_event.log_type);
177 let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
178
179 sqlx::query(
180 r#"
181 UPDATE events
182 SET log_type = ?, discussion_data = ?
183 WHERE id = ?
184 "#,
185 )
186 .bind(new_log_type)
187 .bind(new_discussion_data)
188 .bind(event_id)
189 .execute(self.pool)
190 .await?;
191
192 let updated_event = Event {
193 id: existing_event.id,
194 task_id: existing_event.task_id,
195 timestamp: existing_event.timestamp,
196 log_type: new_log_type.to_string(),
197 discussion_data: new_discussion_data.to_string(),
198 };
199
200 self.notify_event_updated(&updated_event).await;
202
203 Ok(updated_event)
204 }
205
206 pub async fn delete_event(&self, event_id: i64) -> Result<()> {
208 let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
210 .bind(event_id)
211 .fetch_optional(self.pool)
212 .await?;
213
214 let _event = event.ok_or(IntentError::InvalidInput(format!(
215 "Event {} not found",
216 event_id
217 )))?;
218
219 let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
221 .bind(event_id)
222 .execute(self.pool)
223 .await;
224
225 sqlx::query("DELETE FROM events WHERE id = ?")
227 .bind(event_id)
228 .execute(self.pool)
229 .await?;
230
231 self.notify_event_deleted(event_id).await;
233
234 Ok(())
235 }
236
237 pub async fn list_events(
239 &self,
240 task_id: Option<i64>,
241 limit: Option<i64>,
242 log_type: Option<String>,
243 since: Option<String>,
244 ) -> Result<Vec<Event>> {
245 if let Some(tid) = task_id {
247 let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
248 .bind(tid)
249 .fetch_one(self.pool)
250 .await?;
251
252 if !task_exists {
253 return Err(IntentError::TaskNotFound(tid));
254 }
255 }
256
257 let limit = limit.unwrap_or(50);
258
259 let since_timestamp = if let Some(duration_str) = since {
261 Some(crate::time_utils::parse_duration(&duration_str)?)
262 } else {
263 None
264 };
265
266 let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
268 let mut conditions = Vec::new();
269
270 if task_id.is_some() {
271 conditions.push("task_id = ?");
272 }
273
274 if log_type.is_some() {
275 conditions.push("log_type = ?");
276 }
277
278 if since_timestamp.is_some() {
279 conditions.push("timestamp >= ?");
280 }
281
282 if !conditions.is_empty() {
283 query.push_str(" AND ");
284 query.push_str(&conditions.join(" AND "));
285 }
286
287 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
288
289 let mut sql_query = sqlx::query_as::<_, Event>(&query);
291
292 if let Some(tid) = task_id {
293 sql_query = sql_query.bind(tid);
294 }
295
296 if let Some(ref typ) = log_type {
297 sql_query = sql_query.bind(typ);
298 }
299
300 if let Some(ts) = since_timestamp {
301 sql_query = sql_query.bind(ts);
302 }
303
304 sql_query = sql_query.bind(limit);
305
306 let events = sql_query.fetch_all(self.pool).await?;
307
308 Ok(events)
309 }
310
311 pub async fn search_events_fts5(
313 &self,
314 query: &str,
315 limit: Option<i64>,
316 ) -> Result<Vec<EventSearchResult>> {
317 let limit = limit.unwrap_or(20);
318
319 let results = sqlx::query(
321 r#"
322 SELECT
323 e.id,
324 e.task_id,
325 e.timestamp,
326 e.log_type,
327 e.discussion_data,
328 snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
329 FROM events_fts
330 INNER JOIN events e ON events_fts.rowid = e.id
331 WHERE events_fts MATCH ?
332 ORDER BY rank
333 LIMIT ?
334 "#,
335 )
336 .bind(query)
337 .bind(limit)
338 .fetch_all(self.pool)
339 .await?;
340
341 let mut search_results = Vec::new();
342 for row in results {
343 let event = Event {
344 id: row.get("id"),
345 task_id: row.get("task_id"),
346 timestamp: row.get("timestamp"),
347 log_type: row.get("log_type"),
348 discussion_data: row.get("discussion_data"),
349 };
350 let match_snippet: String = row.get("match_snippet");
351
352 search_results.push(EventSearchResult {
353 event,
354 match_snippet,
355 });
356 }
357
358 Ok(search_results)
359 }
360}
361
362#[derive(Debug)]
364pub struct EventSearchResult {
365 pub event: Event,
366 pub match_snippet: String,
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::tasks::TaskManager;
373 use crate::test_utils::test_helpers::TestContext;
374
375 #[tokio::test]
376 async fn test_add_event() {
377 let ctx = TestContext::new().await;
378 let task_mgr = TaskManager::new(ctx.pool());
379 let event_mgr = EventManager::new(ctx.pool());
380
381 let task = task_mgr
382 .add_task("Test task", None, None, None)
383 .await
384 .unwrap();
385 let event = event_mgr
386 .add_event(task.id, "decision", "Test decision")
387 .await
388 .unwrap();
389
390 assert_eq!(event.task_id, task.id);
391 assert_eq!(event.log_type, "decision");
392 assert_eq!(event.discussion_data, "Test decision");
393 }
394
395 #[tokio::test]
396 async fn test_add_event_nonexistent_task() {
397 let ctx = TestContext::new().await;
398 let event_mgr = EventManager::new(ctx.pool());
399
400 let result = event_mgr.add_event(999, "decision", "Test").await;
401 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
402 }
403
404 #[tokio::test]
405 async fn test_list_events() {
406 let ctx = TestContext::new().await;
407 let task_mgr = TaskManager::new(ctx.pool());
408 let event_mgr = EventManager::new(ctx.pool());
409
410 let task = task_mgr
411 .add_task("Test task", None, None, None)
412 .await
413 .unwrap();
414
415 event_mgr
417 .add_event(task.id, "decision", "Decision 1")
418 .await
419 .unwrap();
420 event_mgr
421 .add_event(task.id, "blocker", "Blocker 1")
422 .await
423 .unwrap();
424 event_mgr
425 .add_event(task.id, "milestone", "Milestone 1")
426 .await
427 .unwrap();
428
429 let events = event_mgr
430 .list_events(Some(task.id), None, None, None)
431 .await
432 .unwrap();
433 assert_eq!(events.len(), 3);
434
435 assert_eq!(events[0].log_type, "milestone");
437 assert_eq!(events[1].log_type, "blocker");
438 assert_eq!(events[2].log_type, "decision");
439 }
440
441 #[tokio::test]
442 async fn test_list_events_with_limit() {
443 let ctx = TestContext::new().await;
444 let task_mgr = TaskManager::new(ctx.pool());
445 let event_mgr = EventManager::new(ctx.pool());
446
447 let task = task_mgr
448 .add_task("Test task", None, None, None)
449 .await
450 .unwrap();
451
452 for i in 0..5 {
454 event_mgr
455 .add_event(task.id, "test", &format!("Event {}", i))
456 .await
457 .unwrap();
458 }
459
460 let events = event_mgr
461 .list_events(Some(task.id), Some(3), None, None)
462 .await
463 .unwrap();
464 assert_eq!(events.len(), 3);
465 }
466
467 #[tokio::test]
468 async fn test_list_events_nonexistent_task() {
469 let ctx = TestContext::new().await;
470 let event_mgr = EventManager::new(ctx.pool());
471
472 let result = event_mgr.list_events(Some(999), None, None, None).await;
473 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
474 }
475
476 #[tokio::test]
477 async fn test_list_events_empty() {
478 let ctx = TestContext::new().await;
479 let task_mgr = TaskManager::new(ctx.pool());
480 let event_mgr = EventManager::new(ctx.pool());
481
482 let task = task_mgr
483 .add_task("Test task", None, None, None)
484 .await
485 .unwrap();
486
487 let events = event_mgr
488 .list_events(Some(task.id), None, None, None)
489 .await
490 .unwrap();
491 assert_eq!(events.len(), 0);
492 }
493
494 #[tokio::test]
495 async fn test_update_event() {
496 let ctx = TestContext::new().await;
497 let task_mgr = TaskManager::new(ctx.pool());
498 let event_mgr = EventManager::new(ctx.pool());
499
500 let task = task_mgr
501 .add_task("Test task", None, None, None)
502 .await
503 .unwrap();
504 let event = event_mgr
505 .add_event(task.id, "decision", "Initial decision")
506 .await
507 .unwrap();
508
509 let updated = event_mgr
511 .update_event(event.id, Some("milestone"), Some("Updated decision"))
512 .await
513 .unwrap();
514
515 assert_eq!(updated.id, event.id);
516 assert_eq!(updated.task_id, task.id);
517 assert_eq!(updated.log_type, "milestone");
518 assert_eq!(updated.discussion_data, "Updated decision");
519 }
520
521 #[tokio::test]
522 async fn test_update_event_partial() {
523 let ctx = TestContext::new().await;
524 let task_mgr = TaskManager::new(ctx.pool());
525 let event_mgr = EventManager::new(ctx.pool());
526
527 let task = task_mgr
528 .add_task("Test task", None, None, None)
529 .await
530 .unwrap();
531 let event = event_mgr
532 .add_event(task.id, "decision", "Initial decision")
533 .await
534 .unwrap();
535
536 let updated = event_mgr
538 .update_event(event.id, None, Some("Updated data only"))
539 .await
540 .unwrap();
541
542 assert_eq!(updated.log_type, "decision"); assert_eq!(updated.discussion_data, "Updated data only");
544 }
545
546 #[tokio::test]
547 async fn test_update_event_nonexistent() {
548 let ctx = TestContext::new().await;
549 let event_mgr = EventManager::new(ctx.pool());
550
551 let result = event_mgr
552 .update_event(999, Some("decision"), Some("Test"))
553 .await;
554
555 assert!(result.is_err());
556 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
557 }
558
559 #[tokio::test]
560 async fn test_delete_event() {
561 let ctx = TestContext::new().await;
562 let task_mgr = TaskManager::new(ctx.pool());
563 let event_mgr = EventManager::new(ctx.pool());
564
565 let task = task_mgr
566 .add_task("Test task", None, None, None)
567 .await
568 .unwrap();
569 let event = event_mgr
570 .add_event(task.id, "decision", "To be deleted")
571 .await
572 .unwrap();
573
574 event_mgr.delete_event(event.id).await.unwrap();
576
577 let events = event_mgr
579 .list_events(Some(task.id), None, None, None)
580 .await
581 .unwrap();
582 assert_eq!(events.len(), 0);
583 }
584
585 #[tokio::test]
586 async fn test_delete_event_nonexistent() {
587 let ctx = TestContext::new().await;
588 let event_mgr = EventManager::new(ctx.pool());
589
590 let result = event_mgr.delete_event(999).await;
591 assert!(result.is_err());
592 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
593 }
594
595 #[tokio::test]
596 async fn test_list_events_filter_by_type() {
597 let ctx = TestContext::new().await;
598 let task_mgr = TaskManager::new(ctx.pool());
599 let event_mgr = EventManager::new(ctx.pool());
600
601 let task = task_mgr
602 .add_task("Test task", None, None, None)
603 .await
604 .unwrap();
605
606 event_mgr
608 .add_event(task.id, "decision", "Decision 1")
609 .await
610 .unwrap();
611 event_mgr
612 .add_event(task.id, "blocker", "Blocker 1")
613 .await
614 .unwrap();
615 event_mgr
616 .add_event(task.id, "decision", "Decision 2")
617 .await
618 .unwrap();
619
620 let events = event_mgr
622 .list_events(Some(task.id), None, Some("decision".to_string()), None)
623 .await
624 .unwrap();
625
626 assert_eq!(events.len(), 2);
627 assert!(events.iter().all(|e| e.log_type == "decision"));
628 }
629
630 #[tokio::test]
631 async fn test_list_events_global() {
632 let ctx = TestContext::new().await;
633 let task_mgr = TaskManager::new(ctx.pool());
634 let event_mgr = EventManager::new(ctx.pool());
635
636 let task1 = task_mgr.add_task("Task 1", None, None, None).await.unwrap();
637 let task2 = task_mgr.add_task("Task 2", None, None, None).await.unwrap();
638
639 event_mgr
641 .add_event(task1.id, "decision", "Task 1 Decision")
642 .await
643 .unwrap();
644 event_mgr
645 .add_event(task2.id, "decision", "Task 2 Decision")
646 .await
647 .unwrap();
648
649 let events = event_mgr.list_events(None, None, None, None).await.unwrap();
651
652 assert!(events.len() >= 2); let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
654 let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
655
656 assert_eq!(task1_events.len(), 1);
657 assert_eq!(task2_events.len(), 1);
658 }
659}