1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8 pool: &'a SqlitePool,
9 notifier: crate::notifications::NotificationSender,
10 project_path: Option<String>,
11}
12
13impl<'a> EventManager<'a> {
14 pub fn new(pool: &'a SqlitePool) -> Self {
15 Self {
16 pool,
17 notifier: crate::notifications::NotificationSender::new(None, None),
18 project_path: None,
19 }
20 }
21
22 pub fn with_mcp_notifier(
24 pool: &'a SqlitePool,
25 project_path: String,
26 mcp_notifier: tokio::sync::mpsc::UnboundedSender<String>,
27 ) -> Self {
28 Self {
29 pool,
30 notifier: crate::notifications::NotificationSender::new(None, Some(mcp_notifier)),
31 project_path: Some(project_path),
32 }
33 }
34
35 pub fn with_websocket(
37 pool: &'a SqlitePool,
38 ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
39 project_path: String,
40 ) -> Self {
41 Self {
42 pool,
43 notifier: crate::notifications::NotificationSender::new(Some(ws_state), None),
44 project_path: Some(project_path),
45 }
46 }
47
48 async fn notify_event_created(&self, event: &Event) {
50 use crate::dashboard::websocket::DatabaseOperationPayload;
51
52 let Some(project_path) = &self.project_path else {
53 return;
54 };
55
56 let event_json = match serde_json::to_value(event) {
57 Ok(json) => json,
58 Err(e) => {
59 tracing::warn!("Failed to serialize event for notification: {}", e);
60 return;
61 },
62 };
63
64 let payload =
65 DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
66 self.notifier.send(payload).await;
67 }
68
69 async fn notify_event_updated(&self, event: &Event) {
71 use crate::dashboard::websocket::DatabaseOperationPayload;
72
73 let Some(project_path) = &self.project_path else {
74 return;
75 };
76
77 let event_json = match serde_json::to_value(event) {
78 Ok(json) => json,
79 Err(e) => {
80 tracing::warn!("Failed to serialize event for notification: {}", e);
81 return;
82 },
83 };
84
85 let payload =
86 DatabaseOperationPayload::event_updated(event.id, event_json, project_path.clone());
87 self.notifier.send(payload).await;
88 }
89
90 async fn notify_event_deleted(&self, event_id: i64) {
92 use crate::dashboard::websocket::DatabaseOperationPayload;
93
94 let Some(project_path) = &self.project_path else {
95 return;
96 };
97
98 let payload = DatabaseOperationPayload::event_deleted(event_id, project_path.clone());
99 self.notifier.send(payload).await;
100 }
101
102 pub async fn add_event(
104 &self,
105 task_id: i64,
106 log_type: &str,
107 discussion_data: &str,
108 ) -> Result<Event> {
109 let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
111 .bind(task_id)
112 .fetch_one(self.pool)
113 .await?;
114
115 if !task_exists {
116 return Err(IntentError::TaskNotFound(task_id));
117 }
118
119 let now = Utc::now();
120
121 let result = sqlx::query(
122 r#"
123 INSERT INTO events (task_id, log_type, discussion_data, timestamp)
124 VALUES (?, ?, ?, ?)
125 "#,
126 )
127 .bind(task_id)
128 .bind(log_type)
129 .bind(discussion_data)
130 .bind(now)
131 .execute(self.pool)
132 .await?;
133
134 let id = result.last_insert_rowid();
135
136 let event = Event {
137 id,
138 task_id,
139 timestamp: now,
140 log_type: log_type.to_string(),
141 discussion_data: discussion_data.to_string(),
142 };
143
144 self.notify_event_created(&event).await;
146
147 Ok(event)
148 }
149
150 pub async fn update_event(
152 &self,
153 event_id: i64,
154 log_type: Option<&str>,
155 discussion_data: Option<&str>,
156 ) -> Result<Event> {
157 let existing_event: Option<Event> =
159 sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
160 .bind(event_id)
161 .fetch_optional(self.pool)
162 .await?;
163
164 let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
165 "Event {} not found",
166 event_id
167 )))?;
168
169 let new_log_type = log_type.unwrap_or(&existing_event.log_type);
171 let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
172
173 sqlx::query(
174 r#"
175 UPDATE events
176 SET log_type = ?, discussion_data = ?
177 WHERE id = ?
178 "#,
179 )
180 .bind(new_log_type)
181 .bind(new_discussion_data)
182 .bind(event_id)
183 .execute(self.pool)
184 .await?;
185
186 let updated_event = Event {
187 id: existing_event.id,
188 task_id: existing_event.task_id,
189 timestamp: existing_event.timestamp,
190 log_type: new_log_type.to_string(),
191 discussion_data: new_discussion_data.to_string(),
192 };
193
194 self.notify_event_updated(&updated_event).await;
196
197 Ok(updated_event)
198 }
199
200 pub async fn delete_event(&self, event_id: i64) -> Result<()> {
202 let event: Option<Event> = sqlx::query_as(crate::sql_constants::SELECT_EVENT_BY_ID)
204 .bind(event_id)
205 .fetch_optional(self.pool)
206 .await?;
207
208 let _event = event.ok_or(IntentError::InvalidInput(format!(
209 "Event {} not found",
210 event_id
211 )))?;
212
213 let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
215 .bind(event_id)
216 .execute(self.pool)
217 .await;
218
219 sqlx::query("DELETE FROM events WHERE id = ?")
221 .bind(event_id)
222 .execute(self.pool)
223 .await?;
224
225 self.notify_event_deleted(event_id).await;
227
228 Ok(())
229 }
230
231 pub async fn list_events(
233 &self,
234 task_id: Option<i64>,
235 limit: Option<i64>,
236 log_type: Option<String>,
237 since: Option<String>,
238 ) -> Result<Vec<Event>> {
239 if let Some(tid) = task_id {
241 let task_exists: bool = sqlx::query_scalar(crate::sql_constants::CHECK_TASK_EXISTS)
242 .bind(tid)
243 .fetch_one(self.pool)
244 .await?;
245
246 if !task_exists {
247 return Err(IntentError::TaskNotFound(tid));
248 }
249 }
250
251 let limit = limit.unwrap_or(50);
252
253 let since_timestamp = if let Some(duration_str) = since {
255 Some(crate::time_utils::parse_duration(&duration_str)?)
256 } else {
257 None
258 };
259
260 let mut query = String::from(crate::sql_constants::SELECT_EVENT_BASE);
262 let mut conditions = Vec::new();
263
264 if task_id.is_some() {
265 conditions.push("task_id = ?");
266 }
267
268 if log_type.is_some() {
269 conditions.push("log_type = ?");
270 }
271
272 if since_timestamp.is_some() {
273 conditions.push("timestamp >= ?");
274 }
275
276 if !conditions.is_empty() {
277 query.push_str(" AND ");
278 query.push_str(&conditions.join(" AND "));
279 }
280
281 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
282
283 let mut sql_query = sqlx::query_as::<_, Event>(&query);
285
286 if let Some(tid) = task_id {
287 sql_query = sql_query.bind(tid);
288 }
289
290 if let Some(ref typ) = log_type {
291 sql_query = sql_query.bind(typ);
292 }
293
294 if let Some(ts) = since_timestamp {
295 sql_query = sql_query.bind(ts);
296 }
297
298 sql_query = sql_query.bind(limit);
299
300 let events = sql_query.fetch_all(self.pool).await?;
301
302 Ok(events)
303 }
304
305 pub async fn search_events_fts5(
307 &self,
308 query: &str,
309 limit: Option<i64>,
310 ) -> Result<Vec<EventSearchResult>> {
311 let limit = limit.unwrap_or(20);
312
313 let results = sqlx::query(
315 r#"
316 SELECT
317 e.id,
318 e.task_id,
319 e.timestamp,
320 e.log_type,
321 e.discussion_data,
322 snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
323 FROM events_fts
324 INNER JOIN events e ON events_fts.rowid = e.id
325 WHERE events_fts MATCH ?
326 ORDER BY rank
327 LIMIT ?
328 "#,
329 )
330 .bind(query)
331 .bind(limit)
332 .fetch_all(self.pool)
333 .await?;
334
335 let mut search_results = Vec::new();
336 for row in results {
337 let event = Event {
338 id: row.get("id"),
339 task_id: row.get("task_id"),
340 timestamp: row.get("timestamp"),
341 log_type: row.get("log_type"),
342 discussion_data: row.get("discussion_data"),
343 };
344 let match_snippet: String = row.get("match_snippet");
345
346 search_results.push(EventSearchResult {
347 event,
348 match_snippet,
349 });
350 }
351
352 Ok(search_results)
353 }
354}
355
356#[derive(Debug)]
358pub struct EventSearchResult {
359 pub event: Event,
360 pub match_snippet: String,
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use crate::tasks::TaskManager;
367 use crate::test_utils::test_helpers::TestContext;
368
369 #[tokio::test]
370 async fn test_add_event() {
371 let ctx = TestContext::new().await;
372 let task_mgr = TaskManager::new(ctx.pool());
373 let event_mgr = EventManager::new(ctx.pool());
374
375 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
376 let event = event_mgr
377 .add_event(task.id, "decision", "Test decision")
378 .await
379 .unwrap();
380
381 assert_eq!(event.task_id, task.id);
382 assert_eq!(event.log_type, "decision");
383 assert_eq!(event.discussion_data, "Test decision");
384 }
385
386 #[tokio::test]
387 async fn test_add_event_nonexistent_task() {
388 let ctx = TestContext::new().await;
389 let event_mgr = EventManager::new(ctx.pool());
390
391 let result = event_mgr.add_event(999, "decision", "Test").await;
392 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
393 }
394
395 #[tokio::test]
396 async fn test_list_events() {
397 let ctx = TestContext::new().await;
398 let task_mgr = TaskManager::new(ctx.pool());
399 let event_mgr = EventManager::new(ctx.pool());
400
401 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
402
403 event_mgr
405 .add_event(task.id, "decision", "Decision 1")
406 .await
407 .unwrap();
408 event_mgr
409 .add_event(task.id, "blocker", "Blocker 1")
410 .await
411 .unwrap();
412 event_mgr
413 .add_event(task.id, "milestone", "Milestone 1")
414 .await
415 .unwrap();
416
417 let events = event_mgr
418 .list_events(Some(task.id), None, None, None)
419 .await
420 .unwrap();
421 assert_eq!(events.len(), 3);
422
423 assert_eq!(events[0].log_type, "milestone");
425 assert_eq!(events[1].log_type, "blocker");
426 assert_eq!(events[2].log_type, "decision");
427 }
428
429 #[tokio::test]
430 async fn test_list_events_with_limit() {
431 let ctx = TestContext::new().await;
432 let task_mgr = TaskManager::new(ctx.pool());
433 let event_mgr = EventManager::new(ctx.pool());
434
435 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
436
437 for i in 0..5 {
439 event_mgr
440 .add_event(task.id, "test", &format!("Event {}", i))
441 .await
442 .unwrap();
443 }
444
445 let events = event_mgr
446 .list_events(Some(task.id), Some(3), None, None)
447 .await
448 .unwrap();
449 assert_eq!(events.len(), 3);
450 }
451
452 #[tokio::test]
453 async fn test_list_events_nonexistent_task() {
454 let ctx = TestContext::new().await;
455 let event_mgr = EventManager::new(ctx.pool());
456
457 let result = event_mgr.list_events(Some(999), None, None, None).await;
458 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
459 }
460
461 #[tokio::test]
462 async fn test_list_events_empty() {
463 let ctx = TestContext::new().await;
464 let task_mgr = TaskManager::new(ctx.pool());
465 let event_mgr = EventManager::new(ctx.pool());
466
467 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
468
469 let events = event_mgr
470 .list_events(Some(task.id), None, None, None)
471 .await
472 .unwrap();
473 assert_eq!(events.len(), 0);
474 }
475
476 #[tokio::test]
477 async fn test_update_event() {
478 let ctx = TestContext::new().await;
479 let task_mgr = TaskManager::new(ctx.pool());
480 let event_mgr = EventManager::new(ctx.pool());
481
482 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
483 let event = event_mgr
484 .add_event(task.id, "decision", "Initial decision")
485 .await
486 .unwrap();
487
488 let updated = event_mgr
490 .update_event(event.id, Some("milestone"), Some("Updated decision"))
491 .await
492 .unwrap();
493
494 assert_eq!(updated.id, event.id);
495 assert_eq!(updated.task_id, task.id);
496 assert_eq!(updated.log_type, "milestone");
497 assert_eq!(updated.discussion_data, "Updated decision");
498 }
499
500 #[tokio::test]
501 async fn test_update_event_partial() {
502 let ctx = TestContext::new().await;
503 let task_mgr = TaskManager::new(ctx.pool());
504 let event_mgr = EventManager::new(ctx.pool());
505
506 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
507 let event = event_mgr
508 .add_event(task.id, "decision", "Initial decision")
509 .await
510 .unwrap();
511
512 let updated = event_mgr
514 .update_event(event.id, None, Some("Updated data only"))
515 .await
516 .unwrap();
517
518 assert_eq!(updated.log_type, "decision"); assert_eq!(updated.discussion_data, "Updated data only");
520 }
521
522 #[tokio::test]
523 async fn test_update_event_nonexistent() {
524 let ctx = TestContext::new().await;
525 let event_mgr = EventManager::new(ctx.pool());
526
527 let result = event_mgr
528 .update_event(999, Some("decision"), Some("Test"))
529 .await;
530
531 assert!(result.is_err());
532 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
533 }
534
535 #[tokio::test]
536 async fn test_delete_event() {
537 let ctx = TestContext::new().await;
538 let task_mgr = TaskManager::new(ctx.pool());
539 let event_mgr = EventManager::new(ctx.pool());
540
541 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
542 let event = event_mgr
543 .add_event(task.id, "decision", "To be deleted")
544 .await
545 .unwrap();
546
547 event_mgr.delete_event(event.id).await.unwrap();
549
550 let events = event_mgr
552 .list_events(Some(task.id), None, None, None)
553 .await
554 .unwrap();
555 assert_eq!(events.len(), 0);
556 }
557
558 #[tokio::test]
559 async fn test_delete_event_nonexistent() {
560 let ctx = TestContext::new().await;
561 let event_mgr = EventManager::new(ctx.pool());
562
563 let result = event_mgr.delete_event(999).await;
564 assert!(result.is_err());
565 assert!(matches!(result, Err(IntentError::InvalidInput(_))));
566 }
567
568 #[tokio::test]
569 async fn test_list_events_filter_by_type() {
570 let ctx = TestContext::new().await;
571 let task_mgr = TaskManager::new(ctx.pool());
572 let event_mgr = EventManager::new(ctx.pool());
573
574 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
575
576 event_mgr
578 .add_event(task.id, "decision", "Decision 1")
579 .await
580 .unwrap();
581 event_mgr
582 .add_event(task.id, "blocker", "Blocker 1")
583 .await
584 .unwrap();
585 event_mgr
586 .add_event(task.id, "decision", "Decision 2")
587 .await
588 .unwrap();
589
590 let events = event_mgr
592 .list_events(Some(task.id), None, Some("decision".to_string()), None)
593 .await
594 .unwrap();
595
596 assert_eq!(events.len(), 2);
597 assert!(events.iter().all(|e| e.log_type == "decision"));
598 }
599
600 #[tokio::test]
601 async fn test_list_events_global() {
602 let ctx = TestContext::new().await;
603 let task_mgr = TaskManager::new(ctx.pool());
604 let event_mgr = EventManager::new(ctx.pool());
605
606 let task1 = task_mgr.add_task("Task 1", None, None).await.unwrap();
607 let task2 = task_mgr.add_task("Task 2", None, None).await.unwrap();
608
609 event_mgr
611 .add_event(task1.id, "decision", "Task 1 Decision")
612 .await
613 .unwrap();
614 event_mgr
615 .add_event(task2.id, "decision", "Task 2 Decision")
616 .await
617 .unwrap();
618
619 let events = event_mgr.list_events(None, None, None, None).await.unwrap();
621
622 assert!(events.len() >= 2); let task1_events: Vec<_> = events.iter().filter(|e| e.task_id == task1.id).collect();
624 let task2_events: Vec<_> = events.iter().filter(|e| e.task_id == task2.id).collect();
625
626 assert_eq!(task1_events.len(), 1);
627 assert_eq!(task2_events.len(), 1);
628 }
629}