shaperail_runtime/events/
log.rs1use serde::{Deserialize, Serialize};
2use shaperail_core::ShaperailError;
3use sqlx::Row;
4
5#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct EventRecord {
8 pub event_id: String,
10 pub event: String,
12 pub resource: String,
14 pub action: String,
16 pub data: serde_json::Value,
18 pub timestamp: String,
20}
21
22#[derive(Clone)]
27pub struct EventLog {
28 pool: sqlx::PgPool,
29}
30
31impl EventLog {
32 pub fn new(pool: sqlx::PgPool) -> Self {
34 Self { pool }
35 }
36
37 pub async fn append(&self, record: &EventRecord) -> Result<(), ShaperailError> {
39 sqlx::query(
40 r#"INSERT INTO shaperail_event_log (event_id, event, resource, action, data, timestamp)
41 VALUES ($1, $2, $3, $4, $5, $6)"#,
42 )
43 .bind(&record.event_id)
44 .bind(&record.event)
45 .bind(&record.resource)
46 .bind(&record.action)
47 .bind(&record.data)
48 .bind(&record.timestamp)
49 .execute(&self.pool)
50 .await
51 .map_err(|e| ShaperailError::Internal(format!("Failed to log event: {e}")))?;
52
53 Ok(())
54 }
55
56 pub async fn recent(&self, limit: i64) -> Result<Vec<EventRecord>, ShaperailError> {
58 let rows = sqlx::query(
59 r#"SELECT event_id, event, resource, action, data, timestamp
60 FROM shaperail_event_log
61 ORDER BY timestamp DESC
62 LIMIT $1"#,
63 )
64 .bind(limit)
65 .fetch_all(&self.pool)
66 .await
67 .map_err(|e| ShaperailError::Internal(format!("Failed to query event log: {e}")))?;
68
69 rows.iter().map(row_to_event_record).collect()
70 }
71
72 pub async fn for_resource(
74 &self,
75 resource: &str,
76 limit: i64,
77 ) -> Result<Vec<EventRecord>, ShaperailError> {
78 let rows = sqlx::query(
79 r#"SELECT event_id, event, resource, action, data, timestamp
80 FROM shaperail_event_log
81 WHERE resource = $1
82 ORDER BY timestamp DESC
83 LIMIT $2"#,
84 )
85 .bind(resource)
86 .bind(limit)
87 .fetch_all(&self.pool)
88 .await
89 .map_err(|e| ShaperailError::Internal(format!("Failed to query event log: {e}")))?;
90
91 rows.iter().map(row_to_event_record).collect()
92 }
93}
94
95fn row_to_event_record(row: &sqlx::postgres::PgRow) -> Result<EventRecord, ShaperailError> {
96 Ok(EventRecord {
97 event_id: row
98 .try_get("event_id")
99 .map_err(|e| ShaperailError::Internal(format!("Missing event_id column: {e}")))?,
100 event: row
101 .try_get("event")
102 .map_err(|e| ShaperailError::Internal(format!("Missing event column: {e}")))?,
103 resource: row
104 .try_get("resource")
105 .map_err(|e| ShaperailError::Internal(format!("Missing resource column: {e}")))?,
106 action: row
107 .try_get("action")
108 .map_err(|e| ShaperailError::Internal(format!("Missing action column: {e}")))?,
109 data: row
110 .try_get("data")
111 .map_err(|e| ShaperailError::Internal(format!("Missing data column: {e}")))?,
112 timestamp: row
113 .try_get("timestamp")
114 .map_err(|e| ShaperailError::Internal(format!("Missing timestamp column: {e}")))?,
115 })
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct WebhookDeliveryRecord {
121 pub delivery_id: String,
123 pub event_id: String,
125 pub url: String,
127 pub status_code: i32,
129 pub status: String,
131 pub latency_ms: i64,
133 pub error: Option<String>,
135 pub attempt: i32,
137 pub timestamp: String,
139}
140
141#[derive(Clone)]
143pub struct WebhookDeliveryLog {
144 pool: sqlx::PgPool,
145}
146
147impl WebhookDeliveryLog {
148 pub fn new(pool: sqlx::PgPool) -> Self {
150 Self { pool }
151 }
152
153 pub async fn record(&self, record: &WebhookDeliveryRecord) -> Result<(), ShaperailError> {
155 sqlx::query(
156 r#"INSERT INTO shaperail_webhook_delivery_log
157 (delivery_id, event_id, url, status_code, status, latency_ms, error, attempt, timestamp)
158 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"#,
159 )
160 .bind(&record.delivery_id)
161 .bind(&record.event_id)
162 .bind(&record.url)
163 .bind(record.status_code)
164 .bind(&record.status)
165 .bind(record.latency_ms)
166 .bind(&record.error)
167 .bind(record.attempt)
168 .bind(&record.timestamp)
169 .execute(&self.pool)
170 .await
171 .map_err(|e| ShaperailError::Internal(format!("Failed to log webhook delivery: {e}")))?;
172
173 Ok(())
174 }
175
176 pub async fn for_event(
178 &self,
179 event_id: &str,
180 ) -> Result<Vec<WebhookDeliveryRecord>, ShaperailError> {
181 let rows = sqlx::query(
182 r#"SELECT delivery_id, event_id, url, status_code, status, latency_ms, error, attempt, timestamp
183 FROM shaperail_webhook_delivery_log
184 WHERE event_id = $1
185 ORDER BY timestamp DESC"#,
186 )
187 .bind(event_id)
188 .fetch_all(&self.pool)
189 .await
190 .map_err(|e| ShaperailError::Internal(format!("Failed to query delivery log: {e}")))?;
191
192 rows.iter().map(row_to_delivery_record).collect()
193 }
194
195 pub async fn recent(&self, limit: i64) -> Result<Vec<WebhookDeliveryRecord>, ShaperailError> {
197 let rows = sqlx::query(
198 r#"SELECT delivery_id, event_id, url, status_code, status, latency_ms, error, attempt, timestamp
199 FROM shaperail_webhook_delivery_log
200 ORDER BY timestamp DESC
201 LIMIT $1"#,
202 )
203 .bind(limit)
204 .fetch_all(&self.pool)
205 .await
206 .map_err(|e| ShaperailError::Internal(format!("Failed to query delivery log: {e}")))?;
207
208 rows.iter().map(row_to_delivery_record).collect()
209 }
210}
211
212fn row_to_delivery_record(
213 row: &sqlx::postgres::PgRow,
214) -> Result<WebhookDeliveryRecord, ShaperailError> {
215 Ok(WebhookDeliveryRecord {
216 delivery_id: row
217 .try_get("delivery_id")
218 .map_err(|e| ShaperailError::Internal(format!("Missing delivery_id column: {e}")))?,
219 event_id: row
220 .try_get("event_id")
221 .map_err(|e| ShaperailError::Internal(format!("Missing event_id column: {e}")))?,
222 url: row
223 .try_get("url")
224 .map_err(|e| ShaperailError::Internal(format!("Missing url column: {e}")))?,
225 status_code: row
226 .try_get("status_code")
227 .map_err(|e| ShaperailError::Internal(format!("Missing status_code column: {e}")))?,
228 status: row
229 .try_get("status")
230 .map_err(|e| ShaperailError::Internal(format!("Missing status column: {e}")))?,
231 latency_ms: row
232 .try_get("latency_ms")
233 .map_err(|e| ShaperailError::Internal(format!("Missing latency_ms column: {e}")))?,
234 error: row
235 .try_get("error")
236 .map_err(|e| ShaperailError::Internal(format!("Missing error column: {e}")))?,
237 attempt: row
238 .try_get("attempt")
239 .map_err(|e| ShaperailError::Internal(format!("Missing attempt column: {e}")))?,
240 timestamp: row
241 .try_get("timestamp")
242 .map_err(|e| ShaperailError::Internal(format!("Missing timestamp column: {e}")))?,
243 })
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 #[test]
251 fn event_record_serde() {
252 let record = EventRecord {
253 event_id: "evt-001".to_string(),
254 event: "users.created".to_string(),
255 resource: "users".to_string(),
256 action: "created".to_string(),
257 data: serde_json::json!({"id": "123"}),
258 timestamp: "2026-01-01T00:00:00Z".to_string(),
259 };
260 let json = serde_json::to_string(&record).unwrap();
261 let back: EventRecord = serde_json::from_str(&json).unwrap();
262 assert_eq!(back.event_id, "evt-001");
263 assert_eq!(back.event, "users.created");
264 }
265
266 #[test]
267 fn delivery_record_serde() {
268 let record = WebhookDeliveryRecord {
269 delivery_id: "del-001".to_string(),
270 event_id: "evt-001".to_string(),
271 url: "https://example.com/hook".to_string(),
272 status_code: 200,
273 status: "success".to_string(),
274 latency_ms: 150,
275 error: None,
276 attempt: 1,
277 timestamp: "2026-01-01T00:00:00Z".to_string(),
278 };
279 let json = serde_json::to_string(&record).unwrap();
280 let back: WebhookDeliveryRecord = serde_json::from_str(&json).unwrap();
281 assert_eq!(back.delivery_id, "del-001");
282 assert_eq!(back.status_code, 200);
283 assert!(back.error.is_none());
284 }
285
286 #[test]
287 fn delivery_record_with_error() {
288 let record = WebhookDeliveryRecord {
289 delivery_id: "del-002".to_string(),
290 event_id: "evt-001".to_string(),
291 url: "https://example.com/hook".to_string(),
292 status_code: 500,
293 status: "failed".to_string(),
294 latency_ms: 3000,
295 error: Some("Internal Server Error".to_string()),
296 attempt: 3,
297 timestamp: "2026-01-01T00:00:00Z".to_string(),
298 };
299 let json = serde_json::to_string(&record).unwrap();
300 let back: WebhookDeliveryRecord = serde_json::from_str(&json).unwrap();
301 assert_eq!(back.status, "failed");
302 assert_eq!(back.error.as_deref(), Some("Internal Server Error"));
303 }
304}