1use serde::{Deserialize, Serialize};
2use sha2::{Digest, Sha256};
3use sqlx::{Row, Sqlite, SqlitePool, Transaction};
4
5use punkgo_core::errors::KernelResult;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct EventRecord {
9 pub id: String,
10 pub log_index: i64,
11 pub event_hash: String,
12 pub actor_id: String,
13 pub action_type: String,
14 pub target: String,
15 pub payload: serde_json::Value,
16 pub payload_hash: String,
17 pub artifact_hash: Option<String>,
19 pub reserved_energy: i64,
20 pub settled_energy: i64,
21 pub timestamp: String,
22}
23
24#[derive(Serialize)]
28struct CanonicalEvent<'a> {
29 action_type: &'a str,
30 actor_id: &'a str,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 artifact_hash: Option<&'a str>,
33 event_id: &'a str,
34 log_index: i64,
35 payload_hash: &'a str,
36 reserved_energy: i64,
37 settled_energy: i64,
38 timestamp: &'a str,
39 v: &'static str,
40}
41
42fn compute_event_hash(event: &EventRecord) -> KernelResult<String> {
45 let canonical = CanonicalEvent {
46 action_type: &event.action_type,
47 actor_id: &event.actor_id,
48 artifact_hash: event.artifact_hash.as_deref(),
49 event_id: &event.id,
50 log_index: event.log_index,
51 payload_hash: &event.payload_hash,
52 reserved_energy: event.reserved_energy,
53 settled_energy: event.settled_energy,
54 timestamp: &event.timestamp,
55 v: "punkgo-event-v1",
56 };
57 let json_bytes = serde_json::to_vec(&canonical)?;
58 let mut input = Vec::with_capacity(1 + json_bytes.len());
59 input.push(0x00u8);
60 input.extend_from_slice(&json_bytes);
61 Ok(bytes_to_hex(Sha256::digest(&input).as_slice()))
62}
63
64fn bytes_to_hex(bytes: &[u8]) -> String {
65 const LUT: &[u8; 16] = b"0123456789abcdef";
66 let mut out = String::with_capacity(bytes.len() * 2);
67 for &b in bytes {
68 out.push(LUT[(b >> 4) as usize] as char);
69 out.push(LUT[(b & 0x0f) as usize] as char);
70 }
71 out
72}
73
74#[derive(Clone)]
75pub struct EventLog {
76 pool: SqlitePool,
77}
78
79impl EventLog {
80 pub fn new(pool: SqlitePool) -> Self {
81 Self { pool }
82 }
83
84 pub async fn append(&self, event: &mut EventRecord) -> KernelResult<()> {
87 let mut tx = self.pool.begin().await?;
88 self.assign_and_insert(&mut tx, event).await?;
89 tx.commit().await?;
90 Ok(())
91 }
92
93 pub async fn append_in_tx(
94 &self,
95 tx: &mut Transaction<'_, Sqlite>,
96 event: &mut EventRecord,
97 ) -> KernelResult<()> {
98 self.assign_and_insert(tx, event).await
99 }
100
101 async fn assign_and_insert(
104 &self,
105 tx: &mut Transaction<'_, Sqlite>,
106 event: &mut EventRecord,
107 ) -> KernelResult<()> {
108 let row = sqlx::query("SELECT COALESCE(MAX(log_index), -1) + 1 AS next_idx FROM events")
109 .fetch_one(&mut **tx)
110 .await?;
111 event.log_index = row.get::<i64, _>("next_idx");
112 event.event_hash = compute_event_hash(event)?;
113
114 let payload_json = serde_json::to_string(&event.payload)?;
115
116 sqlx::query(
117 r#"
118 INSERT INTO events (
119 id,
120 log_index,
121 event_hash,
122 actor_id,
123 action_type,
124 target,
125 payload,
126 payload_hash,
127 artifact_hash,
128 reserved_energy,
129 settled_energy,
130 timestamp
131 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
132 "#,
133 )
134 .bind(&event.id)
135 .bind(event.log_index)
136 .bind(&event.event_hash)
137 .bind(&event.actor_id)
138 .bind(&event.action_type)
139 .bind(&event.target)
140 .bind(&payload_json)
141 .bind(&event.payload_hash)
142 .bind(event.artifact_hash.as_deref())
143 .bind(event.reserved_energy)
144 .bind(event.settled_energy)
145 .bind(&event.timestamp)
146 .execute(&mut **tx)
147 .await?;
148 Ok(())
149 }
150
151 pub async fn list_recent(&self, limit: i64) -> KernelResult<Vec<EventRecord>> {
152 let rows = sqlx::query(
153 r#"
154 SELECT id, log_index, event_hash, actor_id, action_type, target, payload,
155 payload_hash, artifact_hash, reserved_energy, settled_energy, timestamp
156 FROM events
157 ORDER BY log_index DESC
158 LIMIT ?1
159 "#,
160 )
161 .bind(limit)
162 .fetch_all(&self.pool)
163 .await?;
164
165 let events = rows
166 .into_iter()
167 .map(|row| {
168 let payload_str: String = row.get("payload");
169 EventRecord {
170 id: row.get("id"),
171 log_index: row.get("log_index"),
172 event_hash: row.get("event_hash"),
173 actor_id: row.get("actor_id"),
174 action_type: row.get("action_type"),
175 target: row.get("target"),
176 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
177 payload_hash: row.get("payload_hash"),
178 artifact_hash: row.get("artifact_hash"),
179 reserved_energy: row.get("reserved_energy"),
180 settled_energy: row.get("settled_energy"),
181 timestamp: row.get("timestamp"),
182 }
183 })
184 .collect();
185 Ok(events)
186 }
187
188 pub async fn list_by_actor(
190 &self,
191 actor_id: &str,
192 limit: i64,
193 ) -> KernelResult<Vec<EventRecord>> {
194 let rows = sqlx::query(
195 r#"
196 SELECT id, log_index, event_hash, actor_id, action_type, target, payload,
197 payload_hash, artifact_hash, reserved_energy, settled_energy, timestamp
198 FROM events
199 WHERE actor_id = ?1
200 ORDER BY log_index DESC
201 LIMIT ?2
202 "#,
203 )
204 .bind(actor_id)
205 .bind(limit)
206 .fetch_all(&self.pool)
207 .await?;
208
209 let events = rows
210 .into_iter()
211 .map(|row| {
212 let payload_str: String = row.get("payload");
213 EventRecord {
214 id: row.get("id"),
215 log_index: row.get("log_index"),
216 event_hash: row.get("event_hash"),
217 actor_id: row.get("actor_id"),
218 action_type: row.get("action_type"),
219 target: row.get("target"),
220 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
221 payload_hash: row.get("payload_hash"),
222 artifact_hash: row.get("artifact_hash"),
223 reserved_energy: row.get("reserved_energy"),
224 settled_energy: row.get("settled_energy"),
225 timestamp: row.get("timestamp"),
226 }
227 })
228 .collect();
229 Ok(events)
230 }
231
232 pub async fn query(
242 &self,
243 actor_id: Option<&str>,
244 before_index: Option<i64>,
245 after_index: Option<i64>,
246 limit: i64,
247 ) -> KernelResult<Vec<EventRecord>> {
248 let mut conditions: Vec<String> = Vec::new();
251 if actor_id.is_some() {
252 conditions.push("actor_id = ?".into());
253 }
254 if before_index.is_some() {
255 conditions.push("log_index < ?".into());
256 }
257 if after_index.is_some() {
258 conditions.push("log_index > ?".into());
259 }
260
261 let where_clause = if conditions.is_empty() {
262 String::new()
263 } else {
264 format!("WHERE {}", conditions.join(" AND "))
265 };
266
267 let sql = format!(
268 r#"SELECT id, log_index, event_hash, actor_id, action_type, target, payload,
269 payload_hash, artifact_hash, reserved_energy, settled_energy, timestamp
270 FROM events
271 {where_clause}
272 ORDER BY log_index DESC
273 LIMIT ?"#
274 );
275
276 let mut q = sqlx::query(&sql);
277 if let Some(a) = actor_id {
278 q = q.bind(a.to_string());
279 }
280 if let Some(b) = before_index {
281 q = q.bind(b);
282 }
283 if let Some(a) = after_index {
284 q = q.bind(a);
285 }
286 q = q.bind(limit);
287
288 let rows = q.fetch_all(&self.pool).await?;
289
290 let events = rows
291 .into_iter()
292 .map(|row| {
293 let payload_str: String = row.get("payload");
294 EventRecord {
295 id: row.get("id"),
296 log_index: row.get("log_index"),
297 event_hash: row.get("event_hash"),
298 actor_id: row.get("actor_id"),
299 action_type: row.get("action_type"),
300 target: row.get("target"),
301 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
302 payload_hash: row.get("payload_hash"),
303 artifact_hash: row.get("artifact_hash"),
304 reserved_energy: row.get("reserved_energy"),
305 settled_energy: row.get("settled_energy"),
306 timestamp: row.get("timestamp"),
307 }
308 })
309 .collect();
310 Ok(events)
311 }
312
313 pub async fn count(&self) -> KernelResult<i64> {
314 let row = sqlx::query("SELECT COUNT(*) AS count FROM events")
315 .fetch_one(&self.pool)
316 .await?;
317 Ok(row.get("count"))
318 }
319}