Skip to main content

punkgo_kernel/state/
event_log.rs

1use serde::{Deserialize, Serialize};
2use sha2::{Digest, Sha256};
3use sqlx::{Row, Sqlite, SqlitePool, Transaction};
4
5use punkgo_core::errors::KernelResult;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct EventRecord {
9    pub id: String,
10    pub log_index: i64,
11    pub event_hash: String,
12    pub actor_id: String,
13    pub action_type: String,
14    pub target: String,
15    pub payload: serde_json::Value,
16    pub payload_hash: String,
17    /// SHA-256(stdout || stderr) for execute actions; None otherwise.
18    pub artifact_hash: Option<String>,
19    pub reserved_energy: i64,
20    pub settled_energy: i64,
21    pub timestamp: String,
22}
23
24/// Canonical event encoding for RFC 8785 (JCS) — fields MUST be in
25/// alphabetical Unicode codepoint order for deterministic hashing.
26/// `artifact_hash` is omitted when None (execute outputs absent).
27#[derive(Serialize)]
28struct CanonicalEvent<'a> {
29    action_type: &'a str,
30    actor_id: &'a str,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    artifact_hash: Option<&'a str>,
33    event_id: &'a str,
34    log_index: i64,
35    payload_hash: &'a str,
36    reserved_energy: i64,
37    settled_energy: i64,
38    timestamp: &'a str,
39    v: &'static str,
40}
41
42/// Computes event_hash = SHA-256(0x00 || canonical_json_bytes)
43/// per RFC 6962 leaf domain separation.
44fn compute_event_hash(event: &EventRecord) -> KernelResult<String> {
45    let canonical = CanonicalEvent {
46        action_type: &event.action_type,
47        actor_id: &event.actor_id,
48        artifact_hash: event.artifact_hash.as_deref(),
49        event_id: &event.id,
50        log_index: event.log_index,
51        payload_hash: &event.payload_hash,
52        reserved_energy: event.reserved_energy,
53        settled_energy: event.settled_energy,
54        timestamp: &event.timestamp,
55        v: "punkgo-event-v1",
56    };
57    let json_bytes = serde_json::to_vec(&canonical)?;
58    let mut input = Vec::with_capacity(1 + json_bytes.len());
59    input.push(0x00u8);
60    input.extend_from_slice(&json_bytes);
61    Ok(bytes_to_hex(Sha256::digest(&input).as_slice()))
62}
63
64fn bytes_to_hex(bytes: &[u8]) -> String {
65    const LUT: &[u8; 16] = b"0123456789abcdef";
66    let mut out = String::with_capacity(bytes.len() * 2);
67    for &b in bytes {
68        out.push(LUT[(b >> 4) as usize] as char);
69        out.push(LUT[(b & 0x0f) as usize] as char);
70    }
71    out
72}
73
74#[derive(Clone)]
75pub struct EventLog {
76    pool: SqlitePool,
77}
78
79impl EventLog {
80    pub fn new(pool: SqlitePool) -> Self {
81        Self { pool }
82    }
83
84    /// Appends an event, assigning a monotonic log_index and computing
85    /// event_hash in place. Wraps in an internal transaction for atomicity.
86    pub async fn append(&self, event: &mut EventRecord) -> KernelResult<()> {
87        let mut tx = self.pool.begin().await?;
88        self.assign_and_insert(&mut tx, event).await?;
89        tx.commit().await?;
90        Ok(())
91    }
92
93    pub async fn append_in_tx(
94        &self,
95        tx: &mut Transaction<'_, Sqlite>,
96        event: &mut EventRecord,
97    ) -> KernelResult<()> {
98        self.assign_and_insert(tx, event).await
99    }
100
101    /// Atomically assigns log_index (MAX+1 within tx), computes event_hash,
102    /// then inserts the complete event record.
103    async fn assign_and_insert(
104        &self,
105        tx: &mut Transaction<'_, Sqlite>,
106        event: &mut EventRecord,
107    ) -> KernelResult<()> {
108        let row = sqlx::query("SELECT COALESCE(MAX(log_index), -1) + 1 AS next_idx FROM events")
109            .fetch_one(&mut **tx)
110            .await?;
111        event.log_index = row.get::<i64, _>("next_idx");
112        event.event_hash = compute_event_hash(event)?;
113
114        let payload_json = serde_json::to_string(&event.payload)?;
115
116        sqlx::query(
117            r#"
118            INSERT INTO events (
119                id,
120                log_index,
121                event_hash,
122                actor_id,
123                action_type,
124                target,
125                payload,
126                payload_hash,
127                artifact_hash,
128                reserved_energy,
129                settled_energy,
130                timestamp
131            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
132            "#,
133        )
134        .bind(&event.id)
135        .bind(event.log_index)
136        .bind(&event.event_hash)
137        .bind(&event.actor_id)
138        .bind(&event.action_type)
139        .bind(&event.target)
140        .bind(&payload_json)
141        .bind(&event.payload_hash)
142        .bind(event.artifact_hash.as_deref())
143        .bind(event.reserved_energy)
144        .bind(event.settled_energy)
145        .bind(&event.timestamp)
146        .execute(&mut **tx)
147        .await?;
148        Ok(())
149    }
150
151    pub async fn list_recent(&self, limit: i64) -> KernelResult<Vec<EventRecord>> {
152        let rows = sqlx::query(
153            r#"
154            SELECT id, log_index, event_hash, actor_id, action_type, target, payload,
155                   payload_hash, artifact_hash, reserved_energy, settled_energy, timestamp
156            FROM events
157            ORDER BY log_index DESC
158            LIMIT ?1
159            "#,
160        )
161        .bind(limit)
162        .fetch_all(&self.pool)
163        .await?;
164
165        let events = rows
166            .into_iter()
167            .map(|row| {
168                let payload_str: String = row.get("payload");
169                EventRecord {
170                    id: row.get("id"),
171                    log_index: row.get("log_index"),
172                    event_hash: row.get("event_hash"),
173                    actor_id: row.get("actor_id"),
174                    action_type: row.get("action_type"),
175                    target: row.get("target"),
176                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
177                    payload_hash: row.get("payload_hash"),
178                    artifact_hash: row.get("artifact_hash"),
179                    reserved_energy: row.get("reserved_energy"),
180                    settled_energy: row.get("settled_energy"),
181                    timestamp: row.get("timestamp"),
182                }
183            })
184            .collect();
185        Ok(events)
186    }
187
188    /// List events filtered by actor_id, ordered by log_index descending.
189    pub async fn list_by_actor(
190        &self,
191        actor_id: &str,
192        limit: i64,
193    ) -> KernelResult<Vec<EventRecord>> {
194        let rows = sqlx::query(
195            r#"
196            SELECT id, log_index, event_hash, actor_id, action_type, target, payload,
197                   payload_hash, artifact_hash, reserved_energy, settled_energy, timestamp
198            FROM events
199            WHERE actor_id = ?1
200            ORDER BY log_index DESC
201            LIMIT ?2
202            "#,
203        )
204        .bind(actor_id)
205        .bind(limit)
206        .fetch_all(&self.pool)
207        .await?;
208
209        let events = rows
210            .into_iter()
211            .map(|row| {
212                let payload_str: String = row.get("payload");
213                EventRecord {
214                    id: row.get("id"),
215                    log_index: row.get("log_index"),
216                    event_hash: row.get("event_hash"),
217                    actor_id: row.get("actor_id"),
218                    action_type: row.get("action_type"),
219                    target: row.get("target"),
220                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
221                    payload_hash: row.get("payload_hash"),
222                    artifact_hash: row.get("artifact_hash"),
223                    reserved_energy: row.get("reserved_energy"),
224                    settled_energy: row.get("settled_energy"),
225                    timestamp: row.get("timestamp"),
226                }
227            })
228            .collect();
229        Ok(events)
230    }
231
232    /// General-purpose event query with composable filters.
233    ///
234    /// All filter fields are optional and combined with AND logic:
235    /// - `actor_id`: restrict to a specific actor
236    /// - `before_index`: only events with log_index < value (for backward pagination)
237    /// - `after_index`: only events with log_index > value (for forward pagination)
238    /// - `limit`: max results (clamped by caller; this method does not clamp)
239    ///
240    /// Returns events ordered by log_index descending (newest first).
241    pub async fn query(
242        &self,
243        actor_id: Option<&str>,
244        before_index: Option<i64>,
245        after_index: Option<i64>,
246        limit: i64,
247    ) -> KernelResult<Vec<EventRecord>> {
248        // Build WHERE clauses dynamically. We use a Vec to keep things clean
249        // and avoid the NULL-parameter anti-pattern that defeats index usage.
250        let mut conditions: Vec<String> = Vec::new();
251        if actor_id.is_some() {
252            conditions.push("actor_id = ?".into());
253        }
254        if before_index.is_some() {
255            conditions.push("log_index < ?".into());
256        }
257        if after_index.is_some() {
258            conditions.push("log_index > ?".into());
259        }
260
261        let where_clause = if conditions.is_empty() {
262            String::new()
263        } else {
264            format!("WHERE {}", conditions.join(" AND "))
265        };
266
267        let sql = format!(
268            r#"SELECT id, log_index, event_hash, actor_id, action_type, target, payload,
269                      payload_hash, artifact_hash, reserved_energy, settled_energy, timestamp
270               FROM events
271               {where_clause}
272               ORDER BY log_index DESC
273               LIMIT ?"#
274        );
275
276        let mut q = sqlx::query(&sql);
277        if let Some(a) = actor_id {
278            q = q.bind(a.to_string());
279        }
280        if let Some(b) = before_index {
281            q = q.bind(b);
282        }
283        if let Some(a) = after_index {
284            q = q.bind(a);
285        }
286        q = q.bind(limit);
287
288        let rows = q.fetch_all(&self.pool).await?;
289
290        let events = rows
291            .into_iter()
292            .map(|row| {
293                let payload_str: String = row.get("payload");
294                EventRecord {
295                    id: row.get("id"),
296                    log_index: row.get("log_index"),
297                    event_hash: row.get("event_hash"),
298                    actor_id: row.get("actor_id"),
299                    action_type: row.get("action_type"),
300                    target: row.get("target"),
301                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
302                    payload_hash: row.get("payload_hash"),
303                    artifact_hash: row.get("artifact_hash"),
304                    reserved_energy: row.get("reserved_energy"),
305                    settled_energy: row.get("settled_energy"),
306                    timestamp: row.get("timestamp"),
307                }
308            })
309            .collect();
310        Ok(events)
311    }
312
313    pub async fn count(&self) -> KernelResult<i64> {
314        let row = sqlx::query("SELECT COUNT(*) AS count FROM events")
315            .fetch_one(&self.pool)
316            .await?;
317        Ok(row.get("count"))
318    }
319}