1use crate::db::models::Event;
2use crate::error::{IntentError, Result};
3use chrono::Utc;
4use sqlx::{Row, SqlitePool};
5use std::sync::Arc;
6
7pub struct EventManager<'a> {
8 pool: &'a SqlitePool,
9 ws_state: Option<Arc<crate::dashboard::websocket::WebSocketState>>,
10 project_path: Option<String>,
11 mcp_notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
12}
13
14impl<'a> EventManager<'a> {
15 pub fn new(pool: &'a SqlitePool) -> Self {
16 Self {
17 pool,
18 ws_state: None,
19 project_path: None,
20 mcp_notifier: None,
21 }
22 }
23
24 pub fn with_mcp_notifier(
26 pool: &'a SqlitePool,
27 project_path: String,
28 mcp_notifier: tokio::sync::mpsc::UnboundedSender<String>,
29 ) -> Self {
30 Self {
31 pool,
32 ws_state: None,
33 project_path: Some(project_path),
34 mcp_notifier: Some(mcp_notifier),
35 }
36 }
37
38 pub fn with_websocket(
40 pool: &'a SqlitePool,
41 ws_state: Arc<crate::dashboard::websocket::WebSocketState>,
42 project_path: String,
43 ) -> Self {
44 Self {
45 pool,
46 ws_state: Some(ws_state),
47 project_path: Some(project_path),
48 mcp_notifier: None,
49 }
50 }
51
52 async fn notify_event_created(&self, event: &Event) {
54 use crate::dashboard::websocket::{DatabaseOperationPayload, ProtocolMessage};
55
56 let event_json = match serde_json::to_value(event) {
58 Ok(json) => json,
59 Err(e) => {
60 tracing::warn!("Failed to serialize event for notification: {}", e);
61 return;
62 },
63 };
64
65 let project_path = match &self.project_path {
66 Some(path) => path.clone(),
67 None => return, };
69
70 let payload =
71 DatabaseOperationPayload::event_created(event.id, event_json, project_path.clone());
72 let msg = ProtocolMessage::new("db_operation", payload);
73 let json = match msg.to_json() {
74 Ok(j) => j,
75 Err(e) => {
76 tracing::warn!("Failed to serialize notification message: {}", e);
77 return;
78 },
79 };
80
81 if let Some(ws) = &self.ws_state {
83 ws.broadcast_to_ui(&json).await;
84 }
85
86 if let Some(notifier) = &self.mcp_notifier {
88 if let Err(e) = notifier.send(json) {
89 tracing::debug!("Failed to send MCP notification (channel closed): {}", e);
90 }
91 }
92 }
93
94 async fn notify_event_updated(&self, event: &Event) {
96 if let (Some(ws), Some(path)) = (&self.ws_state, &self.project_path) {
97 use crate::dashboard::websocket::{DatabaseOperationPayload, ProtocolMessage};
98
99 if let Ok(event_json) = serde_json::to_value(event) {
100 let payload =
101 DatabaseOperationPayload::event_updated(event.id, event_json, path.clone());
102 let msg = ProtocolMessage::new("event_updated", payload);
103 if let Ok(json) = msg.to_json() {
104 ws.broadcast_to_ui(&json).await;
105 }
106 }
107 }
108 }
109
110 async fn notify_event_deleted(&self, event_id: i64) {
112 if let (Some(ws), Some(path)) = (&self.ws_state, &self.project_path) {
113 use crate::dashboard::websocket::{DatabaseOperationPayload, ProtocolMessage};
114
115 let payload = DatabaseOperationPayload::event_deleted(event_id, path.clone());
116 let msg = ProtocolMessage::new("event_deleted", payload);
117 if let Ok(json) = msg.to_json() {
118 ws.broadcast_to_ui(&json).await;
119 }
120 }
121 }
122
123 pub async fn add_event(
125 &self,
126 task_id: i64,
127 log_type: &str,
128 discussion_data: &str,
129 ) -> Result<Event> {
130 let task_exists: bool =
132 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?)")
133 .bind(task_id)
134 .fetch_one(self.pool)
135 .await?;
136
137 if !task_exists {
138 return Err(IntentError::TaskNotFound(task_id));
139 }
140
141 let now = Utc::now();
142
143 let result = sqlx::query(
144 r#"
145 INSERT INTO events (task_id, log_type, discussion_data, timestamp)
146 VALUES (?, ?, ?, ?)
147 "#,
148 )
149 .bind(task_id)
150 .bind(log_type)
151 .bind(discussion_data)
152 .bind(now)
153 .execute(self.pool)
154 .await?;
155
156 let id = result.last_insert_rowid();
157
158 let event = Event {
159 id,
160 task_id,
161 timestamp: now,
162 log_type: log_type.to_string(),
163 discussion_data: discussion_data.to_string(),
164 };
165
166 self.notify_event_created(&event).await;
168
169 Ok(event)
170 }
171
172 pub async fn update_event(
174 &self,
175 event_id: i64,
176 log_type: Option<&str>,
177 discussion_data: Option<&str>,
178 ) -> Result<Event> {
179 let existing_event: Option<Event> = sqlx::query_as(
181 "SELECT id, task_id, timestamp, log_type, discussion_data FROM events WHERE id = ?",
182 )
183 .bind(event_id)
184 .fetch_optional(self.pool)
185 .await?;
186
187 let existing_event = existing_event.ok_or(IntentError::InvalidInput(format!(
188 "Event {} not found",
189 event_id
190 )))?;
191
192 let new_log_type = log_type.unwrap_or(&existing_event.log_type);
194 let new_discussion_data = discussion_data.unwrap_or(&existing_event.discussion_data);
195
196 sqlx::query(
197 r#"
198 UPDATE events
199 SET log_type = ?, discussion_data = ?
200 WHERE id = ?
201 "#,
202 )
203 .bind(new_log_type)
204 .bind(new_discussion_data)
205 .bind(event_id)
206 .execute(self.pool)
207 .await?;
208
209 let updated_event = Event {
210 id: existing_event.id,
211 task_id: existing_event.task_id,
212 timestamp: existing_event.timestamp,
213 log_type: new_log_type.to_string(),
214 discussion_data: new_discussion_data.to_string(),
215 };
216
217 self.notify_event_updated(&updated_event).await;
219
220 Ok(updated_event)
221 }
222
223 pub async fn delete_event(&self, event_id: i64) -> Result<()> {
225 let event: Option<Event> = sqlx::query_as(
227 "SELECT id, task_id, timestamp, log_type, discussion_data FROM events WHERE id = ?",
228 )
229 .bind(event_id)
230 .fetch_optional(self.pool)
231 .await?;
232
233 let _event = event.ok_or(IntentError::InvalidInput(format!(
234 "Event {} not found",
235 event_id
236 )))?;
237
238 let _ = sqlx::query("DELETE FROM events_fts WHERE rowid = ?")
240 .bind(event_id)
241 .execute(self.pool)
242 .await;
243
244 sqlx::query("DELETE FROM events WHERE id = ?")
246 .bind(event_id)
247 .execute(self.pool)
248 .await?;
249
250 self.notify_event_deleted(event_id).await;
252
253 Ok(())
254 }
255
256 pub async fn list_events(
258 &self,
259 task_id: Option<i64>,
260 limit: Option<i64>,
261 log_type: Option<String>,
262 since: Option<String>,
263 ) -> Result<Vec<Event>> {
264 if let Some(tid) = task_id {
266 let task_exists: bool =
267 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tasks WHERE id = ?)")
268 .bind(tid)
269 .fetch_one(self.pool)
270 .await?;
271
272 if !task_exists {
273 return Err(IntentError::TaskNotFound(tid));
274 }
275 }
276
277 let limit = limit.unwrap_or(50);
278
279 let since_timestamp = if let Some(duration_str) = since {
281 Some(Self::parse_duration(&duration_str)?)
282 } else {
283 None
284 };
285
286 let mut query = String::from(
288 "SELECT id, task_id, timestamp, log_type, discussion_data FROM events WHERE 1=1",
289 );
290 let mut conditions = Vec::new();
291
292 if task_id.is_some() {
293 conditions.push("task_id = ?");
294 }
295
296 if log_type.is_some() {
297 conditions.push("log_type = ?");
298 }
299
300 if since_timestamp.is_some() {
301 conditions.push("timestamp >= ?");
302 }
303
304 if !conditions.is_empty() {
305 query.push_str(" AND ");
306 query.push_str(&conditions.join(" AND "));
307 }
308
309 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
310
311 let mut sql_query = sqlx::query_as::<_, Event>(&query);
313
314 if let Some(tid) = task_id {
315 sql_query = sql_query.bind(tid);
316 }
317
318 if let Some(ref typ) = log_type {
319 sql_query = sql_query.bind(typ);
320 }
321
322 if let Some(ts) = since_timestamp {
323 sql_query = sql_query.bind(ts);
324 }
325
326 sql_query = sql_query.bind(limit);
327
328 let events = sql_query.fetch_all(self.pool).await?;
329
330 Ok(events)
331 }
332
333 fn parse_duration(duration: &str) -> Result<chrono::DateTime<Utc>> {
335 let len = duration.len();
336 if len < 2 {
337 return Err(IntentError::InvalidInput(
338 "Duration must be in format like '7d', '24h', or '30m'".to_string(),
339 ));
340 }
341
342 let (num_str, unit) = duration.split_at(len - 1);
343 let num: i64 = num_str.parse().map_err(|_| {
344 IntentError::InvalidInput(format!("Invalid number in duration: {}", num_str))
345 })?;
346
347 let now = Utc::now();
348 let result = match unit {
349 "d" => now - chrono::Duration::days(num),
350 "h" => now - chrono::Duration::hours(num),
351 "m" => now - chrono::Duration::minutes(num),
352 "s" => now - chrono::Duration::seconds(num),
353 _ => {
354 return Err(IntentError::InvalidInput(format!(
355 "Invalid duration unit '{}'. Use 'd' (days), 'h' (hours), 'm' (minutes), or 's' (seconds)",
356 unit
357 )))
358 }
359 };
360
361 Ok(result)
362 }
363
364 pub async fn search_events_fts5(
366 &self,
367 query: &str,
368 limit: Option<i64>,
369 ) -> Result<Vec<EventSearchResult>> {
370 let limit = limit.unwrap_or(20);
371
372 let results = sqlx::query(
374 r#"
375 SELECT
376 e.id,
377 e.task_id,
378 e.timestamp,
379 e.log_type,
380 e.discussion_data,
381 snippet(events_fts, 0, '**', '**', '...', 15) as match_snippet
382 FROM events_fts
383 INNER JOIN events e ON events_fts.rowid = e.id
384 WHERE events_fts MATCH ?
385 ORDER BY rank
386 LIMIT ?
387 "#,
388 )
389 .bind(query)
390 .bind(limit)
391 .fetch_all(self.pool)
392 .await?;
393
394 let mut search_results = Vec::new();
395 for row in results {
396 let event = Event {
397 id: row.get("id"),
398 task_id: row.get("task_id"),
399 timestamp: row.get("timestamp"),
400 log_type: row.get("log_type"),
401 discussion_data: row.get("discussion_data"),
402 };
403 let match_snippet: String = row.get("match_snippet");
404
405 search_results.push(EventSearchResult {
406 event,
407 match_snippet,
408 });
409 }
410
411 Ok(search_results)
412 }
413}
414
415#[derive(Debug)]
417pub struct EventSearchResult {
418 pub event: Event,
419 pub match_snippet: String,
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::tasks::TaskManager;
426 use crate::test_utils::test_helpers::TestContext;
427
428 #[tokio::test]
429 async fn test_add_event() {
430 let ctx = TestContext::new().await;
431 let task_mgr = TaskManager::new(ctx.pool());
432 let event_mgr = EventManager::new(ctx.pool());
433
434 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
435 let event = event_mgr
436 .add_event(task.id, "decision", "Test decision")
437 .await
438 .unwrap();
439
440 assert_eq!(event.task_id, task.id);
441 assert_eq!(event.log_type, "decision");
442 assert_eq!(event.discussion_data, "Test decision");
443 }
444
445 #[tokio::test]
446 async fn test_add_event_nonexistent_task() {
447 let ctx = TestContext::new().await;
448 let event_mgr = EventManager::new(ctx.pool());
449
450 let result = event_mgr.add_event(999, "decision", "Test").await;
451 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
452 }
453
454 #[tokio::test]
455 async fn test_list_events() {
456 let ctx = TestContext::new().await;
457 let task_mgr = TaskManager::new(ctx.pool());
458 let event_mgr = EventManager::new(ctx.pool());
459
460 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
461
462 event_mgr
464 .add_event(task.id, "decision", "Decision 1")
465 .await
466 .unwrap();
467 event_mgr
468 .add_event(task.id, "blocker", "Blocker 1")
469 .await
470 .unwrap();
471 event_mgr
472 .add_event(task.id, "milestone", "Milestone 1")
473 .await
474 .unwrap();
475
476 let events = event_mgr
477 .list_events(Some(task.id), None, None, None)
478 .await
479 .unwrap();
480 assert_eq!(events.len(), 3);
481
482 assert_eq!(events[0].log_type, "milestone");
484 assert_eq!(events[1].log_type, "blocker");
485 assert_eq!(events[2].log_type, "decision");
486 }
487
488 #[tokio::test]
489 async fn test_list_events_with_limit() {
490 let ctx = TestContext::new().await;
491 let task_mgr = TaskManager::new(ctx.pool());
492 let event_mgr = EventManager::new(ctx.pool());
493
494 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
495
496 for i in 0..5 {
498 event_mgr
499 .add_event(task.id, "test", &format!("Event {}", i))
500 .await
501 .unwrap();
502 }
503
504 let events = event_mgr
505 .list_events(Some(task.id), Some(3), None, None)
506 .await
507 .unwrap();
508 assert_eq!(events.len(), 3);
509 }
510
511 #[tokio::test]
512 async fn test_list_events_nonexistent_task() {
513 let ctx = TestContext::new().await;
514 let event_mgr = EventManager::new(ctx.pool());
515
516 let result = event_mgr.list_events(Some(999), None, None, None).await;
517 assert!(matches!(result, Err(IntentError::TaskNotFound(999))));
518 }
519
520 #[tokio::test]
521 async fn test_list_events_empty() {
522 let ctx = TestContext::new().await;
523 let task_mgr = TaskManager::new(ctx.pool());
524 let event_mgr = EventManager::new(ctx.pool());
525
526 let task = task_mgr.add_task("Test task", None, None).await.unwrap();
527
528 let events = event_mgr
529 .list_events(Some(task.id), None, None, None)
530 .await
531 .unwrap();
532 assert_eq!(events.len(), 0);
533 }
534}