forge_runtime/workflow/
event_store.rs

1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use forge_core::workflow::{WorkflowEvent, WorkflowEventSender};
6use forge_core::{ForgeError, Result};
7
8/// Event store for durable workflow events.
9pub struct EventStore {
10    pool: PgPool,
11}
12
13impl EventStore {
14    /// Create a new event store.
15    pub fn new(pool: PgPool) -> Self {
16        Self { pool }
17    }
18
19    /// Send an event to a workflow.
20    pub async fn send_event(
21        &self,
22        event_name: &str,
23        correlation_id: &str,
24        payload: Option<serde_json::Value>,
25    ) -> Result<Uuid> {
26        let id = Uuid::new_v4();
27
28        sqlx::query(
29            r#"
30            INSERT INTO forge_workflow_events (id, event_name, correlation_id, payload)
31            VALUES ($1, $2, $3, $4)
32            "#,
33        )
34        .bind(id)
35        .bind(event_name)
36        .bind(correlation_id)
37        .bind(&payload)
38        .execute(&self.pool)
39        .await
40        .map_err(|e| ForgeError::Database(e.to_string()))?;
41
42        // Send notification for immediate processing
43        sqlx::query("SELECT pg_notify('forge_workflow_events', $1)")
44            .bind(format!("{}:{}", event_name, correlation_id))
45            .execute(&self.pool)
46            .await
47            .map_err(|e| ForgeError::Database(e.to_string()))?;
48
49        tracing::debug!(
50            event_id = %id,
51            event_name = %event_name,
52            correlation_id = %correlation_id,
53            "Workflow event sent"
54        );
55
56        Ok(id)
57    }
58
59    /// Consume an event for a workflow.
60    #[allow(clippy::type_complexity)]
61    pub async fn consume_event(
62        &self,
63        event_name: &str,
64        correlation_id: &str,
65        workflow_run_id: Uuid,
66    ) -> Result<Option<WorkflowEvent>> {
67        let result: Option<(
68            Uuid,
69            String,
70            String,
71            Option<serde_json::Value>,
72            DateTime<Utc>,
73        )> = sqlx::query_as(
74            r#"
75                UPDATE forge_workflow_events
76                SET consumed_at = NOW(), consumed_by = $3
77                WHERE id = (
78                    SELECT id FROM forge_workflow_events
79                    WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
80                    ORDER BY created_at ASC LIMIT 1
81                    FOR UPDATE SKIP LOCKED
82                )
83                RETURNING id, event_name, correlation_id, payload, created_at
84                "#,
85        )
86        .bind(event_name)
87        .bind(correlation_id)
88        .bind(workflow_run_id)
89        .fetch_optional(&self.pool)
90        .await
91        .map_err(|e| ForgeError::Database(e.to_string()))?;
92
93        Ok(result.map(
94            |(id, event_name, correlation_id, payload, created_at)| WorkflowEvent {
95                id,
96                event_name,
97                correlation_id,
98                payload,
99                created_at,
100            },
101        ))
102    }
103
104    /// Check if an event exists for a workflow (without consuming).
105    pub async fn has_event(&self, event_name: &str, correlation_id: &str) -> Result<bool> {
106        let result: (i64,) = sqlx::query_as(
107            r#"
108            SELECT COUNT(*) FROM forge_workflow_events
109            WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
110            "#,
111        )
112        .bind(event_name)
113        .bind(correlation_id)
114        .fetch_one(&self.pool)
115        .await
116        .map_err(|e| ForgeError::Database(e.to_string()))?;
117
118        Ok(result.0 > 0)
119    }
120
121    /// List pending events for a workflow.
122    #[allow(clippy::type_complexity)]
123    pub async fn list_pending_events(&self, correlation_id: &str) -> Result<Vec<WorkflowEvent>> {
124        let results: Vec<(
125            Uuid,
126            String,
127            String,
128            Option<serde_json::Value>,
129            DateTime<Utc>,
130        )> = sqlx::query_as(
131            r#"
132                SELECT id, event_name, correlation_id, payload, created_at
133                FROM forge_workflow_events
134                WHERE correlation_id = $1 AND consumed_at IS NULL
135                ORDER BY created_at ASC
136                "#,
137        )
138        .bind(correlation_id)
139        .fetch_all(&self.pool)
140        .await
141        .map_err(|e| ForgeError::Database(e.to_string()))?;
142
143        Ok(results
144            .into_iter()
145            .map(
146                |(id, event_name, correlation_id, payload, created_at)| WorkflowEvent {
147                    id,
148                    event_name,
149                    correlation_id,
150                    payload,
151                    created_at,
152                },
153            )
154            .collect())
155    }
156
157    /// Clean up old consumed events.
158    pub async fn cleanup_consumed_events(&self, older_than: DateTime<Utc>) -> Result<u64> {
159        let result = sqlx::query(
160            r#"
161            DELETE FROM forge_workflow_events
162            WHERE consumed_at IS NOT NULL AND consumed_at < $1
163            "#,
164        )
165        .bind(older_than)
166        .execute(&self.pool)
167        .await
168        .map_err(|e| ForgeError::Database(e.to_string()))?;
169
170        Ok(result.rows_affected())
171    }
172}
173
174impl WorkflowEventSender for EventStore {
175    async fn send_event(
176        &self,
177        event_name: &str,
178        correlation_id: &str,
179        payload: Option<serde_json::Value>,
180    ) -> Result<Uuid> {
181        EventStore::send_event(self, event_name, correlation_id, payload).await
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[tokio::test]
190    async fn test_event_store_creation() {
191        // Just test that the struct can be created
192        let pool = sqlx::postgres::PgPoolOptions::new()
193            .max_connections(1)
194            .connect_lazy("postgres://localhost/test")
195            .expect("Failed to create mock pool");
196
197        let _store = EventStore::new(pool);
198    }
199}