Skip to main content

forge_runtime/workflow/
event_store.rs

1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use forge_core::workflow::{WorkflowEvent, WorkflowEventSender};
6use forge_core::{ForgeError, Result};
7
8/// Event store for durable workflow events.
9pub struct EventStore {
10    pool: PgPool,
11}
12
13impl EventStore {
14    /// Create a new event store.
15    pub fn new(pool: PgPool) -> Self {
16        Self { pool }
17    }
18
19    /// Send an event to a workflow.
20    pub async fn send_event(
21        &self,
22        event_name: &str,
23        correlation_id: &str,
24        payload: Option<serde_json::Value>,
25    ) -> Result<Uuid> {
26        let id = Uuid::new_v4();
27
28        sqlx::query!(
29            r#"
30            INSERT INTO forge_workflow_events (id, event_name, correlation_id, payload)
31            VALUES ($1, $2, $3, $4)
32            "#,
33            id,
34            event_name,
35            correlation_id,
36            payload as _,
37        )
38        .execute(&self.pool)
39        .await
40        .map_err(|e| ForgeError::Database(e.to_string()))?;
41
42        // Send notification for immediate processing
43        sqlx::query_scalar!(
44            "SELECT pg_notify('forge_workflow_events', $1)",
45            format!("{}:{}", event_name, correlation_id),
46        )
47        .fetch_one(&self.pool)
48        .await
49        .map_err(|e| ForgeError::Database(e.to_string()))?;
50
51        tracing::debug!(
52            event_id = %id,
53            event_name = %event_name,
54            correlation_id = %correlation_id,
55            "Workflow event sent"
56        );
57
58        Ok(id)
59    }
60
61    /// Consume an event for a workflow.
62    #[allow(clippy::type_complexity)]
63    pub async fn consume_event(
64        &self,
65        event_name: &str,
66        correlation_id: &str,
67        workflow_run_id: Uuid,
68    ) -> Result<Option<WorkflowEvent>> {
69        let result = sqlx::query!(
70            r#"
71                UPDATE forge_workflow_events
72                SET consumed_at = NOW(), consumed_by = $3
73                WHERE id = (
74                    SELECT id FROM forge_workflow_events
75                    WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
76                    ORDER BY created_at ASC LIMIT 1
77                    FOR UPDATE SKIP LOCKED
78                )
79                RETURNING id, event_name, correlation_id, payload, created_at
80                "#,
81            event_name,
82            correlation_id,
83            workflow_run_id
84        )
85        .fetch_optional(&self.pool)
86        .await
87        .map_err(|e| ForgeError::Database(e.to_string()))?;
88
89        Ok(result.map(|row| WorkflowEvent {
90            id: row.id,
91            event_name: row.event_name,
92            correlation_id: row.correlation_id,
93            payload: row.payload,
94            created_at: row.created_at,
95        }))
96    }
97
98    /// Check if an event exists for a workflow (without consuming).
99    pub async fn has_event(&self, event_name: &str, correlation_id: &str) -> Result<bool> {
100        let result = sqlx::query_scalar!(
101            r#"
102            SELECT COUNT(*) FROM forge_workflow_events
103            WHERE event_name = $1 AND correlation_id = $2 AND consumed_at IS NULL
104            "#,
105            event_name,
106            correlation_id
107        )
108        .fetch_one(&self.pool)
109        .await
110        .map_err(|e| ForgeError::Database(e.to_string()))?;
111
112        Ok(result.unwrap_or(0) > 0)
113    }
114
115    /// List pending events for a workflow.
116    #[allow(clippy::type_complexity)]
117    pub async fn list_pending_events(&self, correlation_id: &str) -> Result<Vec<WorkflowEvent>> {
118        let results = sqlx::query!(
119            r#"
120                SELECT id, event_name, correlation_id, payload, created_at
121                FROM forge_workflow_events
122                WHERE correlation_id = $1 AND consumed_at IS NULL
123                ORDER BY created_at ASC
124                "#,
125            correlation_id
126        )
127        .fetch_all(&self.pool)
128        .await
129        .map_err(|e| ForgeError::Database(e.to_string()))?;
130
131        Ok(results
132            .into_iter()
133            .map(|row| WorkflowEvent {
134                id: row.id,
135                event_name: row.event_name,
136                correlation_id: row.correlation_id,
137                payload: row.payload,
138                created_at: row.created_at,
139            })
140            .collect())
141    }
142
143    /// Clean up old consumed events.
144    pub async fn cleanup_consumed_events(&self, older_than: DateTime<Utc>) -> Result<u64> {
145        let result = sqlx::query!(
146            r#"
147            DELETE FROM forge_workflow_events
148            WHERE consumed_at IS NOT NULL AND consumed_at < $1
149            "#,
150            older_than,
151        )
152        .execute(&self.pool)
153        .await
154        .map_err(|e| ForgeError::Database(e.to_string()))?;
155
156        Ok(result.rows_affected())
157    }
158}
159
160impl WorkflowEventSender for EventStore {
161    async fn send_event(
162        &self,
163        event_name: &str,
164        correlation_id: &str,
165        payload: Option<serde_json::Value>,
166    ) -> Result<Uuid> {
167        EventStore::send_event(self, event_name, correlation_id, payload).await
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[tokio::test]
176    async fn test_event_store_creation() {
177        // Just test that the struct can be created
178        let pool = sqlx::postgres::PgPoolOptions::new()
179            .max_connections(1)
180            .connect_lazy("postgres://localhost/test")
181            .expect("Failed to create mock pool");
182
183        let _store = EventStore::new(pool);
184    }
185}