Skip to main content

shaperail_runtime/events/
log.rs

1use serde::{Deserialize, Serialize};
2use shaperail_core::ShaperailError;
3use sqlx::Row;
4
5/// An append-only event log record for audit and replay.
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct EventRecord {
8    /// Unique event ID.
9    pub event_id: String,
10    /// Event name (e.g., "users.created").
11    pub event: String,
12    /// Resource name.
13    pub resource: String,
14    /// Action that triggered the event.
15    pub action: String,
16    /// Event data payload.
17    pub data: serde_json::Value,
18    /// ISO 8601 timestamp.
19    pub timestamp: String,
20}
21
22/// Append-only event log backed by the database.
23///
24/// Stores all emitted events for audit trails and replay.
25/// Uses an append-only pattern — no UPDATE or DELETE operations.
26#[derive(Clone)]
27pub struct EventLog {
28    pool: sqlx::PgPool,
29}
30
31impl EventLog {
32    /// Creates a new event log.
33    pub fn new(pool: sqlx::PgPool) -> Self {
34        Self { pool }
35    }
36
37    /// Appends an event record to the log.
38    pub async fn append(&self, record: &EventRecord) -> Result<(), ShaperailError> {
39        sqlx::query(
40            r#"INSERT INTO shaperail_event_log (event_id, event, resource, action, data, timestamp)
41               VALUES ($1, $2, $3, $4, $5, $6)"#,
42        )
43        .bind(&record.event_id)
44        .bind(&record.event)
45        .bind(&record.resource)
46        .bind(&record.action)
47        .bind(&record.data)
48        .bind(&record.timestamp)
49        .execute(&self.pool)
50        .await
51        .map_err(|e| ShaperailError::Internal(format!("Failed to log event: {e}")))?;
52
53        Ok(())
54    }
55
56    /// Retrieves recent events, ordered by timestamp descending.
57    pub async fn recent(&self, limit: i64) -> Result<Vec<EventRecord>, ShaperailError> {
58        let rows = sqlx::query(
59            r#"SELECT event_id, event, resource, action, data, timestamp
60               FROM shaperail_event_log
61               ORDER BY timestamp DESC
62               LIMIT $1"#,
63        )
64        .bind(limit)
65        .fetch_all(&self.pool)
66        .await
67        .map_err(|e| ShaperailError::Internal(format!("Failed to query event log: {e}")))?;
68
69        rows.iter().map(row_to_event_record).collect()
70    }
71
72    /// Retrieves events for a specific resource.
73    pub async fn for_resource(
74        &self,
75        resource: &str,
76        limit: i64,
77    ) -> Result<Vec<EventRecord>, ShaperailError> {
78        let rows = sqlx::query(
79            r#"SELECT event_id, event, resource, action, data, timestamp
80               FROM shaperail_event_log
81               WHERE resource = $1
82               ORDER BY timestamp DESC
83               LIMIT $2"#,
84        )
85        .bind(resource)
86        .bind(limit)
87        .fetch_all(&self.pool)
88        .await
89        .map_err(|e| ShaperailError::Internal(format!("Failed to query event log: {e}")))?;
90
91        rows.iter().map(row_to_event_record).collect()
92    }
93}
94
95fn row_to_event_record(row: &sqlx::postgres::PgRow) -> Result<EventRecord, ShaperailError> {
96    Ok(EventRecord {
97        event_id: row
98            .try_get("event_id")
99            .map_err(|e| ShaperailError::Internal(format!("Missing event_id column: {e}")))?,
100        event: row
101            .try_get("event")
102            .map_err(|e| ShaperailError::Internal(format!("Missing event column: {e}")))?,
103        resource: row
104            .try_get("resource")
105            .map_err(|e| ShaperailError::Internal(format!("Missing resource column: {e}")))?,
106        action: row
107            .try_get("action")
108            .map_err(|e| ShaperailError::Internal(format!("Missing action column: {e}")))?,
109        data: row
110            .try_get("data")
111            .map_err(|e| ShaperailError::Internal(format!("Missing data column: {e}")))?,
112        timestamp: row
113            .try_get("timestamp")
114            .map_err(|e| ShaperailError::Internal(format!("Missing timestamp column: {e}")))?,
115    })
116}
117
118/// Webhook delivery status.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct WebhookDeliveryRecord {
121    /// Unique delivery ID.
122    pub delivery_id: String,
123    /// The event ID that triggered this delivery.
124    pub event_id: String,
125    /// Target webhook URL.
126    pub url: String,
127    /// HTTP status code from the target (0 if connection failed).
128    pub status_code: i32,
129    /// Delivery status: "success", "failed", "pending".
130    pub status: String,
131    /// Response latency in milliseconds.
132    pub latency_ms: i64,
133    /// Error message if delivery failed.
134    pub error: Option<String>,
135    /// Number of attempts made.
136    pub attempt: i32,
137    /// ISO 8601 timestamp.
138    pub timestamp: String,
139}
140
141/// Webhook delivery log backed by the database.
142#[derive(Clone)]
143pub struct WebhookDeliveryLog {
144    pool: sqlx::PgPool,
145}
146
147impl WebhookDeliveryLog {
148    /// Creates a new delivery log.
149    pub fn new(pool: sqlx::PgPool) -> Self {
150        Self { pool }
151    }
152
153    /// Records a webhook delivery attempt.
154    pub async fn record(&self, record: &WebhookDeliveryRecord) -> Result<(), ShaperailError> {
155        sqlx::query(
156            r#"INSERT INTO shaperail_webhook_delivery_log
157               (delivery_id, event_id, url, status_code, status, latency_ms, error, attempt, timestamp)
158               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"#,
159        )
160        .bind(&record.delivery_id)
161        .bind(&record.event_id)
162        .bind(&record.url)
163        .bind(record.status_code)
164        .bind(&record.status)
165        .bind(record.latency_ms)
166        .bind(&record.error)
167        .bind(record.attempt)
168        .bind(&record.timestamp)
169        .execute(&self.pool)
170        .await
171        .map_err(|e| ShaperailError::Internal(format!("Failed to log webhook delivery: {e}")))?;
172
173        Ok(())
174    }
175
176    /// Retrieves delivery records for a specific event.
177    pub async fn for_event(
178        &self,
179        event_id: &str,
180    ) -> Result<Vec<WebhookDeliveryRecord>, ShaperailError> {
181        let rows = sqlx::query(
182            r#"SELECT delivery_id, event_id, url, status_code, status, latency_ms, error, attempt, timestamp
183               FROM shaperail_webhook_delivery_log
184               WHERE event_id = $1
185               ORDER BY timestamp DESC"#,
186        )
187        .bind(event_id)
188        .fetch_all(&self.pool)
189        .await
190        .map_err(|e| ShaperailError::Internal(format!("Failed to query delivery log: {e}")))?;
191
192        rows.iter().map(row_to_delivery_record).collect()
193    }
194
195    /// Retrieves recent deliveries.
196    pub async fn recent(&self, limit: i64) -> Result<Vec<WebhookDeliveryRecord>, ShaperailError> {
197        let rows = sqlx::query(
198            r#"SELECT delivery_id, event_id, url, status_code, status, latency_ms, error, attempt, timestamp
199               FROM shaperail_webhook_delivery_log
200               ORDER BY timestamp DESC
201               LIMIT $1"#,
202        )
203        .bind(limit)
204        .fetch_all(&self.pool)
205        .await
206        .map_err(|e| ShaperailError::Internal(format!("Failed to query delivery log: {e}")))?;
207
208        rows.iter().map(row_to_delivery_record).collect()
209    }
210}
211
212fn row_to_delivery_record(
213    row: &sqlx::postgres::PgRow,
214) -> Result<WebhookDeliveryRecord, ShaperailError> {
215    Ok(WebhookDeliveryRecord {
216        delivery_id: row
217            .try_get("delivery_id")
218            .map_err(|e| ShaperailError::Internal(format!("Missing delivery_id column: {e}")))?,
219        event_id: row
220            .try_get("event_id")
221            .map_err(|e| ShaperailError::Internal(format!("Missing event_id column: {e}")))?,
222        url: row
223            .try_get("url")
224            .map_err(|e| ShaperailError::Internal(format!("Missing url column: {e}")))?,
225        status_code: row
226            .try_get("status_code")
227            .map_err(|e| ShaperailError::Internal(format!("Missing status_code column: {e}")))?,
228        status: row
229            .try_get("status")
230            .map_err(|e| ShaperailError::Internal(format!("Missing status column: {e}")))?,
231        latency_ms: row
232            .try_get("latency_ms")
233            .map_err(|e| ShaperailError::Internal(format!("Missing latency_ms column: {e}")))?,
234        error: row
235            .try_get("error")
236            .map_err(|e| ShaperailError::Internal(format!("Missing error column: {e}")))?,
237        attempt: row
238            .try_get("attempt")
239            .map_err(|e| ShaperailError::Internal(format!("Missing attempt column: {e}")))?,
240        timestamp: row
241            .try_get("timestamp")
242            .map_err(|e| ShaperailError::Internal(format!("Missing timestamp column: {e}")))?,
243    })
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn event_record_serde() {
252        let record = EventRecord {
253            event_id: "evt-001".to_string(),
254            event: "users.created".to_string(),
255            resource: "users".to_string(),
256            action: "created".to_string(),
257            data: serde_json::json!({"id": "123"}),
258            timestamp: "2026-01-01T00:00:00Z".to_string(),
259        };
260        let json = serde_json::to_string(&record).unwrap();
261        let back: EventRecord = serde_json::from_str(&json).unwrap();
262        assert_eq!(back.event_id, "evt-001");
263        assert_eq!(back.event, "users.created");
264    }
265
266    #[test]
267    fn delivery_record_serde() {
268        let record = WebhookDeliveryRecord {
269            delivery_id: "del-001".to_string(),
270            event_id: "evt-001".to_string(),
271            url: "https://example.com/hook".to_string(),
272            status_code: 200,
273            status: "success".to_string(),
274            latency_ms: 150,
275            error: None,
276            attempt: 1,
277            timestamp: "2026-01-01T00:00:00Z".to_string(),
278        };
279        let json = serde_json::to_string(&record).unwrap();
280        let back: WebhookDeliveryRecord = serde_json::from_str(&json).unwrap();
281        assert_eq!(back.delivery_id, "del-001");
282        assert_eq!(back.status_code, 200);
283        assert!(back.error.is_none());
284    }
285
286    #[test]
287    fn delivery_record_with_error() {
288        let record = WebhookDeliveryRecord {
289            delivery_id: "del-002".to_string(),
290            event_id: "evt-001".to_string(),
291            url: "https://example.com/hook".to_string(),
292            status_code: 500,
293            status: "failed".to_string(),
294            latency_ms: 3000,
295            error: Some("Internal Server Error".to_string()),
296            attempt: 3,
297            timestamp: "2026-01-01T00:00:00Z".to_string(),
298        };
299        let json = serde_json::to_string(&record).unwrap();
300        let back: WebhookDeliveryRecord = serde_json::from_str(&json).unwrap();
301        assert_eq!(back.status, "failed");
302        assert_eq!(back.error.as_deref(), Some("Internal Server Error"));
303    }
304}