forge_runtime/workflow/
event_store.rs1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use forge_core::workflow::{WorkflowEvent, WorkflowEventSender};
6use forge_core::{ForgeError, Result};
7
8pub struct EventStore {
10 pool: PgPool,
11}
12
13impl EventStore {
14 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18
19 pub async fn send_event(
21 &self,
22 event_name: &str,
23 correlation_id: &str,
24 payload: Option<serde_json::Value>,
25 ) -> Result<Uuid> {
26 let id = Uuid::new_v4();
27
28 sqlx::query(
29 r#"
30 INSERT INTO forge_workflow_events (id, event_name, correlation_id, payload)
31 VALUES ($1, $2, $3, $4)
32 "#,
33 )
34 .bind(id)
35 .bind(event_name)
36 .bind(correlation_id)
37 .bind(&payload)
38 .execute(&self.pool)
39 .await
40 .map_err(|e| ForgeError::Database(e.to_string()))?;
41
42 sqlx::query("SELECT pg_notify('forge_workflow_events', $1)")
44 .bind(format!("{}:{}", event_name, correlation_id))
45 .execute(&self.pool)
46 .await
47 .map_err(|e| ForgeError::Database(e.to_string()))?;
48
49 tracing::debug!(
50 event_id = %id,
51 event_name = %event_name,
52 correlation_id = %correlation_id,
53 "Workflow event sent"
54 );
55
56 Ok(id)
57 }
58
59 #[allow(clippy::type_complexity)]
61 pub async fn consume_event(
62 &self,
63 event_name: &str,
64 correlation_id: &str,
65 workflow_run_id: Uuid,
66 ) -> Result<Option<WorkflowEvent>> {
67 let result = sqlx::query!(
68 r#"
69 UPDATE forge_workflow_events
70 SET consumed_at = NOW(), consumed_by = $3
71 WHERE id = (
72 SELECT id FROM forge_workflow_events
73 WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
74 ORDER BY created_at ASC LIMIT 1
75 FOR UPDATE SKIP LOCKED
76 )
77 RETURNING id, event_name, correlation_id, payload, created_at
78 "#,
79 event_name,
80 correlation_id,
81 workflow_run_id
82 )
83 .fetch_optional(&self.pool)
84 .await
85 .map_err(|e| ForgeError::Database(e.to_string()))?;
86
87 Ok(result.map(|row| WorkflowEvent {
88 id: row.id,
89 event_name: row.event_name,
90 correlation_id: row.correlation_id,
91 payload: row.payload,
92 created_at: row.created_at,
93 }))
94 }
95
96 pub async fn has_event(&self, event_name: &str, correlation_id: &str) -> Result<bool> {
98 let result = sqlx::query_scalar!(
99 r#"
100 SELECT COUNT(*) FROM forge_workflow_events
101 WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
102 "#,
103 event_name,
104 correlation_id
105 )
106 .fetch_one(&self.pool)
107 .await
108 .map_err(|e| ForgeError::Database(e.to_string()))?;
109
110 Ok(result.unwrap_or(0) > 0)
111 }
112
113 #[allow(clippy::type_complexity)]
115 pub async fn list_pending_events(&self, correlation_id: &str) -> Result<Vec<WorkflowEvent>> {
116 let results = sqlx::query!(
117 r#"
118 SELECT id, event_name, correlation_id, payload, created_at
119 FROM forge_workflow_events
120 WHERE correlation_id = $1 AND consumed_at IS NULL
121 ORDER BY created_at ASC
122 "#,
123 correlation_id
124 )
125 .fetch_all(&self.pool)
126 .await
127 .map_err(|e| ForgeError::Database(e.to_string()))?;
128
129 Ok(results
130 .into_iter()
131 .map(|row| WorkflowEvent {
132 id: row.id,
133 event_name: row.event_name,
134 correlation_id: row.correlation_id,
135 payload: row.payload,
136 created_at: row.created_at,
137 })
138 .collect())
139 }
140
141 pub async fn cleanup_consumed_events(&self, older_than: DateTime<Utc>) -> Result<u64> {
143 let result = sqlx::query(
144 r#"
145 DELETE FROM forge_workflow_events
146 WHERE consumed_at IS NOT NULL AND consumed_at < $1
147 "#,
148 )
149 .bind(older_than)
150 .execute(&self.pool)
151 .await
152 .map_err(|e| ForgeError::Database(e.to_string()))?;
153
154 Ok(result.rows_affected())
155 }
156}
157
158impl WorkflowEventSender for EventStore {
159 async fn send_event(
160 &self,
161 event_name: &str,
162 correlation_id: &str,
163 payload: Option<serde_json::Value>,
164 ) -> Result<Uuid> {
165 EventStore::send_event(self, event_name, correlation_id, payload).await
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[tokio::test]
174 async fn test_event_store_creation() {
175 let pool = sqlx::postgres::PgPoolOptions::new()
177 .max_connections(1)
178 .connect_lazy("postgres://localhost/test")
179 .expect("Failed to create mock pool");
180
181 let _store = EventStore::new(pool);
182 }
183}