forge_runtime/workflow/
event_store.rs1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use forge_core::workflow::{WorkflowEvent, WorkflowEventSender};
6use forge_core::{ForgeError, Result};
7
8pub struct EventStore {
10 pool: PgPool,
11}
12
13impl EventStore {
14 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18
19 pub async fn send_event(
21 &self,
22 event_name: &str,
23 correlation_id: &str,
24 payload: Option<serde_json::Value>,
25 ) -> Result<Uuid> {
26 let id = Uuid::new_v4();
27
28 sqlx::query!(
29 r#"
30 INSERT INTO forge_workflow_events (id, event_name, correlation_id, payload)
31 VALUES ($1, $2, $3, $4)
32 "#,
33 id,
34 event_name,
35 correlation_id,
36 payload as _,
37 )
38 .execute(&self.pool)
39 .await
40 .map_err(|e| ForgeError::Database(e.to_string()))?;
41
42 sqlx::query_scalar!(
44 "SELECT pg_notify('forge_workflow_events', $1)",
45 format!("{}:{}", event_name, correlation_id),
46 )
47 .fetch_one(&self.pool)
48 .await
49 .map_err(|e| ForgeError::Database(e.to_string()))?;
50
51 tracing::debug!(
52 event_id = %id,
53 event_name = %event_name,
54 correlation_id = %correlation_id,
55 "Workflow event sent"
56 );
57
58 Ok(id)
59 }
60
61 #[allow(clippy::type_complexity)]
63 pub async fn consume_event(
64 &self,
65 event_name: &str,
66 correlation_id: &str,
67 workflow_run_id: Uuid,
68 ) -> Result<Option<WorkflowEvent>> {
69 let result = sqlx::query!(
70 r#"
71 UPDATE forge_workflow_events
72 SET consumed_at = NOW(), consumed_by = $3
73 WHERE id = (
74 SELECT id FROM forge_workflow_events
75 WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
76 ORDER BY created_at ASC LIMIT 1
77 FOR UPDATE SKIP LOCKED
78 )
79 RETURNING id, event_name, correlation_id, payload, created_at
80 "#,
81 event_name,
82 correlation_id,
83 workflow_run_id
84 )
85 .fetch_optional(&self.pool)
86 .await
87 .map_err(|e| ForgeError::Database(e.to_string()))?;
88
89 Ok(result.map(|row| WorkflowEvent {
90 id: row.id,
91 event_name: row.event_name,
92 correlation_id: row.correlation_id,
93 payload: row.payload,
94 created_at: row.created_at,
95 }))
96 }
97
98 pub async fn has_event(&self, event_name: &str, correlation_id: &str) -> Result<bool> {
100 let result = sqlx::query_scalar!(
101 r#"
102 SELECT COUNT(*) FROM forge_workflow_events
103 WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
104 "#,
105 event_name,
106 correlation_id
107 )
108 .fetch_one(&self.pool)
109 .await
110 .map_err(|e| ForgeError::Database(e.to_string()))?;
111
112 Ok(result.unwrap_or(0) > 0)
113 }
114
115 #[allow(clippy::type_complexity)]
117 pub async fn list_pending_events(&self, correlation_id: &str) -> Result<Vec<WorkflowEvent>> {
118 let results = sqlx::query!(
119 r#"
120 SELECT id, event_name, correlation_id, payload, created_at
121 FROM forge_workflow_events
122 WHERE correlation_id = $1 AND consumed_at IS NULL
123 ORDER BY created_at ASC
124 "#,
125 correlation_id
126 )
127 .fetch_all(&self.pool)
128 .await
129 .map_err(|e| ForgeError::Database(e.to_string()))?;
130
131 Ok(results
132 .into_iter()
133 .map(|row| WorkflowEvent {
134 id: row.id,
135 event_name: row.event_name,
136 correlation_id: row.correlation_id,
137 payload: row.payload,
138 created_at: row.created_at,
139 })
140 .collect())
141 }
142
143 pub async fn cleanup_consumed_events(&self, older_than: DateTime<Utc>) -> Result<u64> {
145 let result = sqlx::query!(
146 r#"
147 DELETE FROM forge_workflow_events
148 WHERE consumed_at IS NOT NULL AND consumed_at < $1
149 "#,
150 older_than,
151 )
152 .execute(&self.pool)
153 .await
154 .map_err(|e| ForgeError::Database(e.to_string()))?;
155
156 Ok(result.rows_affected())
157 }
158}
159
160impl WorkflowEventSender for EventStore {
161 async fn send_event(
162 &self,
163 event_name: &str,
164 correlation_id: &str,
165 payload: Option<serde_json::Value>,
166 ) -> Result<Uuid> {
167 EventStore::send_event(self, event_name, correlation_id, payload).await
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[tokio::test]
176 async fn test_event_store_creation() {
177 let pool = sqlx::postgres::PgPoolOptions::new()
179 .max_connections(1)
180 .connect_lazy("postgres://localhost/test")
181 .expect("Failed to create mock pool");
182
183 let _store = EventStore::new(pool);
184 }
185}