Skip to main content

forge_runtime/workflow/
event_store.rs

1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use forge_core::workflow::{WorkflowEvent, WorkflowEventSender};
6use forge_core::{ForgeError, Result};
7
8/// Event store for durable workflow events.
9pub struct EventStore {
10    pool: PgPool,
11}
12
13impl EventStore {
14    /// Create a new event store.
15    pub fn new(pool: PgPool) -> Self {
16        Self { pool }
17    }
18
19    /// Send an event to a workflow.
20    pub async fn send_event(
21        &self,
22        event_name: &str,
23        correlation_id: &str,
24        payload: Option<serde_json::Value>,
25    ) -> Result<Uuid> {
26        let id = Uuid::new_v4();
27
28        sqlx::query(
29            r#"
30            INSERT INTO forge_workflow_events (id, event_name, correlation_id, payload)
31            VALUES ($1, $2, $3, $4)
32            "#,
33        )
34        .bind(id)
35        .bind(event_name)
36        .bind(correlation_id)
37        .bind(&payload)
38        .execute(&self.pool)
39        .await
40        .map_err(|e| ForgeError::Database(e.to_string()))?;
41
42        // Send notification for immediate processing
43        sqlx::query("SELECT pg_notify('forge_workflow_events', $1)")
44            .bind(format!("{}:{}", event_name, correlation_id))
45            .execute(&self.pool)
46            .await
47            .map_err(|e| ForgeError::Database(e.to_string()))?;
48
49        tracing::debug!(
50            event_id = %id,
51            event_name = %event_name,
52            correlation_id = %correlation_id,
53            "Workflow event sent"
54        );
55
56        Ok(id)
57    }
58
59    /// Consume an event for a workflow.
60    #[allow(clippy::type_complexity)]
61    pub async fn consume_event(
62        &self,
63        event_name: &str,
64        correlation_id: &str,
65        workflow_run_id: Uuid,
66    ) -> Result<Option<WorkflowEvent>> {
67        let result = sqlx::query!(
68            r#"
69                UPDATE forge_workflow_events
70                SET consumed_at = NOW(), consumed_by = $3
71                WHERE id = (
72                    SELECT id FROM forge_workflow_events
73                    WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
74                    ORDER BY created_at ASC LIMIT 1
75                    FOR UPDATE SKIP LOCKED
76                )
77                RETURNING id, event_name, correlation_id, payload, created_at
78                "#,
79            event_name,
80            correlation_id,
81            workflow_run_id
82        )
83        .fetch_optional(&self.pool)
84        .await
85        .map_err(|e| ForgeError::Database(e.to_string()))?;
86
87        Ok(result.map(|row| WorkflowEvent {
88            id: row.id,
89            event_name: row.event_name,
90            correlation_id: row.correlation_id,
91            payload: row.payload,
92            created_at: row.created_at,
93        }))
94    }
95
96    /// Check if an event exists for a workflow (without consuming).
97    pub async fn has_event(&self, event_name: &str, correlation_id: &str) -> Result<bool> {
98        let result = sqlx::query_scalar!(
99            r#"
100            SELECT COUNT(*) FROM forge_workflow_events
101            WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
102            "#,
103            event_name,
104            correlation_id
105        )
106        .fetch_one(&self.pool)
107        .await
108        .map_err(|e| ForgeError::Database(e.to_string()))?;
109
110        Ok(result.unwrap_or(0) > 0)
111    }
112
113    /// List pending events for a workflow.
114    #[allow(clippy::type_complexity)]
115    pub async fn list_pending_events(&self, correlation_id: &str) -> Result<Vec<WorkflowEvent>> {
116        let results = sqlx::query!(
117            r#"
118                SELECT id, event_name, correlation_id, payload, created_at
119                FROM forge_workflow_events
120                WHERE correlation_id = $1 AND consumed_at IS NULL
121                ORDER BY created_at ASC
122                "#,
123            correlation_id
124        )
125        .fetch_all(&self.pool)
126        .await
127        .map_err(|e| ForgeError::Database(e.to_string()))?;
128
129        Ok(results
130            .into_iter()
131            .map(|row| WorkflowEvent {
132                id: row.id,
133                event_name: row.event_name,
134                correlation_id: row.correlation_id,
135                payload: row.payload,
136                created_at: row.created_at,
137            })
138            .collect())
139    }
140
141    /// Clean up old consumed events.
142    pub async fn cleanup_consumed_events(&self, older_than: DateTime<Utc>) -> Result<u64> {
143        let result = sqlx::query(
144            r#"
145            DELETE FROM forge_workflow_events
146            WHERE consumed_at IS NOT NULL AND consumed_at < $1
147            "#,
148        )
149        .bind(older_than)
150        .execute(&self.pool)
151        .await
152        .map_err(|e| ForgeError::Database(e.to_string()))?;
153
154        Ok(result.rows_affected())
155    }
156}
157
158impl WorkflowEventSender for EventStore {
159    async fn send_event(
160        &self,
161        event_name: &str,
162        correlation_id: &str,
163        payload: Option<serde_json::Value>,
164    ) -> Result<Uuid> {
165        EventStore::send_event(self, event_name, correlation_id, payload).await
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[tokio::test]
174    async fn test_event_store_creation() {
175        // Just test that the struct can be created
176        let pool = sqlx::postgres::PgPoolOptions::new()
177            .max_connections(1)
178            .connect_lazy("postgres://localhost/test")
179            .expect("Failed to create mock pool");
180
181        let _store = EventStore::new(pool);
182    }
183}