forge_runtime/workflow/
event_store.rs1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use forge_core::workflow::{WorkflowEvent, WorkflowEventSender};
6use forge_core::{ForgeError, Result};
7
8pub struct EventStore {
10 pool: PgPool,
11}
12
13impl EventStore {
14 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18
19 pub async fn send_event(
21 &self,
22 event_name: &str,
23 correlation_id: &str,
24 payload: Option<serde_json::Value>,
25 ) -> Result<Uuid> {
26 let id = Uuid::new_v4();
27
28 sqlx::query(
29 r#"
30 INSERT INTO forge_workflow_events (id, event_name, correlation_id, payload)
31 VALUES ($1, $2, $3, $4)
32 "#,
33 )
34 .bind(id)
35 .bind(event_name)
36 .bind(correlation_id)
37 .bind(&payload)
38 .execute(&self.pool)
39 .await
40 .map_err(|e| ForgeError::Database(e.to_string()))?;
41
42 sqlx::query("SELECT pg_notify('forge_workflow_events', $1)")
44 .bind(format!("{}:{}", event_name, correlation_id))
45 .execute(&self.pool)
46 .await
47 .map_err(|e| ForgeError::Database(e.to_string()))?;
48
49 tracing::debug!(
50 event_id = %id,
51 event_name = %event_name,
52 correlation_id = %correlation_id,
53 "Workflow event sent"
54 );
55
56 Ok(id)
57 }
58
59 #[allow(clippy::type_complexity)]
61 pub async fn consume_event(
62 &self,
63 event_name: &str,
64 correlation_id: &str,
65 workflow_run_id: Uuid,
66 ) -> Result<Option<WorkflowEvent>> {
67 let result: Option<(
68 Uuid,
69 String,
70 String,
71 Option<serde_json::Value>,
72 DateTime<Utc>,
73 )> = sqlx::query_as(
74 r#"
75 UPDATE forge_workflow_events
76 SET consumed_at = NOW(), consumed_by = $3
77 WHERE id = (
78 SELECT id FROM forge_workflow_events
79 WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
80 ORDER BY created_at ASC LIMIT 1
81 FOR UPDATE SKIP LOCKED
82 )
83 RETURNING id, event_name, correlation_id, payload, created_at
84 "#,
85 )
86 .bind(event_name)
87 .bind(correlation_id)
88 .bind(workflow_run_id)
89 .fetch_optional(&self.pool)
90 .await
91 .map_err(|e| ForgeError::Database(e.to_string()))?;
92
93 Ok(result.map(
94 |(id, event_name, correlation_id, payload, created_at)| WorkflowEvent {
95 id,
96 event_name,
97 correlation_id,
98 payload,
99 created_at,
100 },
101 ))
102 }
103
104 pub async fn has_event(&self, event_name: &str, correlation_id: &str) -> Result<bool> {
106 let result: (i64,) = sqlx::query_as(
107 r#"
108 SELECT COUNT(*) FROM forge_workflow_events
109 WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
110 "#,
111 )
112 .bind(event_name)
113 .bind(correlation_id)
114 .fetch_one(&self.pool)
115 .await
116 .map_err(|e| ForgeError::Database(e.to_string()))?;
117
118 Ok(result.0 > 0)
119 }
120
121 #[allow(clippy::type_complexity)]
123 pub async fn list_pending_events(&self, correlation_id: &str) -> Result<Vec<WorkflowEvent>> {
124 let results: Vec<(
125 Uuid,
126 String,
127 String,
128 Option<serde_json::Value>,
129 DateTime<Utc>,
130 )> = sqlx::query_as(
131 r#"
132 SELECT id, event_name, correlation_id, payload, created_at
133 FROM forge_workflow_events
134 WHERE correlation_id = $1 AND consumed_at IS NULL
135 ORDER BY created_at ASC
136 "#,
137 )
138 .bind(correlation_id)
139 .fetch_all(&self.pool)
140 .await
141 .map_err(|e| ForgeError::Database(e.to_string()))?;
142
143 Ok(results
144 .into_iter()
145 .map(
146 |(id, event_name, correlation_id, payload, created_at)| WorkflowEvent {
147 id,
148 event_name,
149 correlation_id,
150 payload,
151 created_at,
152 },
153 )
154 .collect())
155 }
156
157 pub async fn cleanup_consumed_events(&self, older_than: DateTime<Utc>) -> Result<u64> {
159 let result = sqlx::query(
160 r#"
161 DELETE FROM forge_workflow_events
162 WHERE consumed_at IS NOT NULL AND consumed_at < $1
163 "#,
164 )
165 .bind(older_than)
166 .execute(&self.pool)
167 .await
168 .map_err(|e| ForgeError::Database(e.to_string()))?;
169
170 Ok(result.rows_affected())
171 }
172}
173
174impl WorkflowEventSender for EventStore {
175 async fn send_event(
176 &self,
177 event_name: &str,
178 correlation_id: &str,
179 payload: Option<serde_json::Value>,
180 ) -> Result<Uuid> {
181 EventStore::send_event(self, event_name, correlation_id, payload).await
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[tokio::test]
190 async fn test_event_store_creation() {
191 let pool = sqlx::postgres::PgPoolOptions::new()
193 .max_connections(1)
194 .connect_lazy("postgres://localhost/test")
195 .expect("Failed to create mock pool");
196
197 let _store = EventStore::new(pool);
198 }
199}