Skip to main content

punkgo_kernel/state/
store.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use punkgo_core::errors::{KernelError, KernelResult};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Row, SqlitePool};
7use tracing::{info, warn};
8
9#[derive(Clone)]
10pub struct StatePaths {
11    pub root: PathBuf,
12    pub workspace_root: PathBuf,
13    pub quarantine_root: PathBuf,
14    pub blobs_root: PathBuf,
15    pub db_path: PathBuf,
16}
17
18pub struct StateStore {
19    pool: SqlitePool,
20    paths: StatePaths,
21}
22
23impl StateStore {
24    pub async fn bootstrap(root: impl AsRef<Path>) -> KernelResult<Self> {
25        let root = root.as_ref().to_path_buf();
26        let workspace_root = root.join("workspaces");
27        let quarantine_root = root.join("quarantine");
28        let event_log_root = root.join("event_log");
29        let blobs_root = root.join("blobs");
30
31        fs::create_dir_all(&workspace_root)?;
32        fs::create_dir_all(&quarantine_root)?;
33        fs::create_dir_all(&event_log_root)?;
34        fs::create_dir_all(&blobs_root)?;
35
36        // No file-based lock — SQLite WAL handles concurrent access, and the
37        // IPC endpoint binding prevents duplicate daemon processes. The old
38        // kernel.lock approach left stale lock files after crashes.
39
40        let db_path = root.join("punkgo.db");
41        let connect_options = SqliteConnectOptions::new()
42            .filename(&db_path)
43            .create_if_missing(true)
44            .journal_mode(SqliteJournalMode::Wal)
45            .synchronous(SqliteSynchronous::Full)
46            .busy_timeout(std::time::Duration::from_secs(5));
47
48        let max_conn = std::thread::available_parallelism()
49            .map(|n| n.get() as u32)
50            .unwrap_or(8)
51            .max(8);
52
53        let pool = SqlitePoolOptions::new()
54            .max_connections(max_conn)
55            .connect_with(connect_options)
56            .await?;
57
58        Self::init_schema(&pool).await?;
59        Self::seed_defaults(&pool).await?;
60
61        let store = Self {
62            pool,
63            paths: StatePaths {
64                root,
65                workspace_root,
66                quarantine_root,
67                blobs_root,
68                db_path,
69            },
70        };
71
72        info!(state_root = %store.paths.root.display(), "state store bootstrapped");
73        store.run_startup_integrity_checks().await?;
74        Ok(store)
75    }
76
77    pub fn pool(&self) -> SqlitePool {
78        self.pool.clone()
79    }
80
81    pub fn paths(&self) -> &StatePaths {
82        &self.paths
83    }
84
85    pub async fn actor_exists(&self, actor_id: &str) -> KernelResult<bool> {
86        let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
87            .bind(actor_id)
88            .fetch_optional(&self.pool)
89            .await?;
90        Ok(row.is_some())
91    }
92
93    async fn init_schema(pool: &SqlitePool) -> KernelResult<()> {
94        sqlx::query(
95            r#"
96            CREATE TABLE IF NOT EXISTS events (
97                id TEXT PRIMARY KEY,
98                log_index INTEGER NOT NULL UNIQUE,
99                event_hash TEXT NOT NULL,
100                actor_id TEXT NOT NULL,
101                action_type TEXT NOT NULL,
102                target TEXT NOT NULL,
103                payload TEXT NOT NULL,
104                payload_hash TEXT NOT NULL,
105                artifact_hash TEXT,
106                reserved_energy INTEGER NOT NULL,
107                settled_energy INTEGER NOT NULL,
108                timestamp TEXT NOT NULL
109            )
110            "#,
111        )
112        .execute(pool)
113        .await?;
114
115        sqlx::query(
116            r#"
117            CREATE TABLE IF NOT EXISTS energy_ledger (
118                actor_id TEXT PRIMARY KEY,
119                energy_balance INTEGER NOT NULL,
120                reserved_energy INTEGER NOT NULL DEFAULT 0,
121                updated_at TEXT NOT NULL
122            )
123            "#,
124        )
125        .execute(pool)
126        .await?;
127
128        sqlx::query(
129            r#"
130            CREATE TABLE IF NOT EXISTS system_meta (
131                key TEXT PRIMARY KEY,
132                value TEXT NOT NULL
133            )
134            "#,
135        )
136        .execute(pool)
137        .await?;
138
139        sqlx::query(
140            r#"
141            CREATE TABLE IF NOT EXISTS audit_hashes (
142                hash_index INTEGER NOT NULL PRIMARY KEY,
143                hash BLOB NOT NULL
144            )
145            "#,
146        )
147        .execute(pool)
148        .await?;
149
150        sqlx::query(
151            r#"
152            CREATE TABLE IF NOT EXISTS audit_checkpoints (
153                tree_size INTEGER NOT NULL PRIMARY KEY,
154                root_hash TEXT NOT NULL,
155                checkpoint_text TEXT NOT NULL,
156                created_at TEXT NOT NULL
157            )
158            "#,
159        )
160        .execute(pool)
161        .await?;
162
163        // Phase 1.5: TSA timestamp tokens — external time anchoring for checkpoints.
164        sqlx::query(
165            r#"
166            CREATE TABLE IF NOT EXISTS audit_tsa_tokens (
167                tree_size INTEGER NOT NULL PRIMARY KEY,
168                root_hash TEXT NOT NULL,
169                tsa_response BLOB NOT NULL,
170                tsa_url TEXT NOT NULL,
171                created_at TEXT NOT NULL
172            )
173            "#,
174        )
175        .execute(pool)
176        .await?;
177
178        // Phase 1: actors table — explicit actor records with type, lineage, boundary.
179        sqlx::query(
180            r#"
181            CREATE TABLE IF NOT EXISTS actors (
182                actor_id         TEXT PRIMARY KEY,
183                actor_type       TEXT NOT NULL CHECK(actor_type IN ('human', 'agent')),
184                creator_id       TEXT,
185                lineage          TEXT NOT NULL DEFAULT '[]',
186                purpose          TEXT,
187                status           TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'frozen')),
188                writable_targets TEXT NOT NULL DEFAULT '[]',
189                energy_share     REAL NOT NULL DEFAULT 0.0,
190                reduction_policy TEXT NOT NULL DEFAULT 'none',
191                created_at       TEXT NOT NULL,
192                updated_at       TEXT NOT NULL
193            )
194            "#,
195        )
196        .execute(pool)
197        .await?;
198
199        // Phase 4b: envelopes table — budget envelope authorization system (PIP-001 §11).
200        // PIP-001 §11a: hold_on, hold_timeout_secs are part of the base schema.
201        sqlx::query(
202            r#"
203            CREATE TABLE IF NOT EXISTS envelopes (
204                envelope_id        TEXT PRIMARY KEY,
205                actor_id           TEXT NOT NULL,
206                grantor_id         TEXT NOT NULL,
207                parent_envelope_id TEXT,
208                budget             INTEGER NOT NULL,
209                budget_consumed    INTEGER NOT NULL DEFAULT 0,
210                targets            TEXT NOT NULL,
211                actions            TEXT NOT NULL,
212                duration_secs      INTEGER,
213                report_every       INTEGER,
214                hold_on            TEXT NOT NULL DEFAULT '[]',
215                hold_timeout_secs  INTEGER,
216                status             TEXT NOT NULL DEFAULT 'active'
217                                   CHECK(status IN ('active','expired','revoked')),
218                last_report_at     INTEGER NOT NULL DEFAULT 0,
219                created_at         TEXT NOT NULL,
220                expires_at         TEXT,
221                updated_at         TEXT NOT NULL
222            )
223            "#,
224        )
225        .execute(pool)
226        .await?;
227
228        // PIP-001 §11b: hold_requests table — tracks hold events and pending actions.
229        sqlx::query(
230            r#"
231            CREATE TABLE IF NOT EXISTS hold_requests (
232                hold_id          TEXT PRIMARY KEY,
233                envelope_id      TEXT NOT NULL,
234                agent_id         TEXT NOT NULL,
235                trigger_target   TEXT NOT NULL,
236                trigger_action   TEXT NOT NULL,
237                pending_payload  TEXT NOT NULL,
238                status           TEXT NOT NULL DEFAULT 'pending'
239                                 CHECK(status IN ('pending','approved','rejected','timed_out')),
240                decision         TEXT,
241                instruction      TEXT,
242                triggered_at     TEXT NOT NULL,
243                resolved_at      TEXT,
244                FOREIGN KEY (envelope_id) REFERENCES envelopes(envelope_id)
245            )
246            "#,
247        )
248        .execute(pool)
249        .await?;
250
251        Ok(())
252    }
253
254    async fn seed_defaults(pool: &SqlitePool) -> KernelResult<()> {
255        let now = now_millis_string();
256        sqlx::query(
257            r#"
258            INSERT INTO system_meta (key, value)
259            VALUES ('schema_version', 'v1')
260            ON CONFLICT(key) DO NOTHING
261            "#,
262        )
263        .execute(pool)
264        .await?;
265
266        sqlx::query(
267            r#"
268            INSERT INTO energy_ledger (actor_id, energy_balance, reserved_energy, updated_at)
269            VALUES ('root', 1000000, 0, ?1)
270            ON CONFLICT(actor_id) DO NOTHING
271            "#,
272        )
273        .bind(&now)
274        .execute(pool)
275        .await?;
276
277        // Phase 1: seed root as human actor with full wildcard boundary (PIP-001 §10).
278        // Root is a genesis actor — it receives a one-time energy balance but does not
279        // participate in tick-based energy distribution (energy_share = 0.0).
280        // Only agents receive ongoing energy production.
281        sqlx::query(
282            r#"
283            INSERT INTO actors (
284                actor_id, actor_type, creator_id, lineage, purpose, status,
285                writable_targets, energy_share, reduction_policy, created_at, updated_at
286            )
287            VALUES ('root', 'human', NULL, '[]', NULL, 'active',
288                    '[{"target":"**","actions":["create","mutate","execute"]}]',
289                    0.0, 'none', ?1, ?1)
290            ON CONFLICT(actor_id) DO NOTHING
291            "#,
292        )
293        .bind(&now)
294        .execute(pool)
295        .await?;
296
297        Ok(())
298    }
299
300    pub async fn run_startup_integrity_checks(&self) -> KernelResult<()> {
301        info!("running startup integrity checks");
302        let violation = sqlx::query(
303            r#"
304            SELECT actor_id, energy_balance, reserved_energy
305            FROM energy_ledger
306            WHERE reserved_energy < 0
307               OR energy_balance < 0
308               OR reserved_energy > energy_balance
309            LIMIT 1
310            "#,
311        )
312        .fetch_optional(&self.pool)
313        .await?;
314
315        if let Some(row) = violation {
316            let actor_id: String = row.get("actor_id");
317            return Err(KernelError::PolicyViolation(format!(
318                "energy ledger consistency violation for actor {actor_id}"
319            )));
320        }
321
322        let invalid_action = sqlx::query(
323            r#"
324            SELECT action_type
325            FROM events
326            WHERE action_type NOT IN (
327                'observe', 'create', 'mutate', 'execute',
328                'circuit_breaker',
329                'hold_request', 'hold_response', 'hold_timeout'
330            )
331            LIMIT 1
332            "#,
333        )
334        .fetch_optional(&self.pool)
335        .await?;
336        if let Some(row) = invalid_action {
337            let action_type: String = row.get("action_type");
338            return Err(KernelError::PolicyViolation(format!(
339                "action integrity violation: unsupported action_type={action_type}"
340            )));
341        }
342
343        let invalid_energy_event = sqlx::query(
344            r#"
345            SELECT id, log_index, action_type, reserved_energy, settled_energy
346            FROM events
347            WHERE (action_type IN ('mutate', 'execute') AND (reserved_energy <= 0 OR settled_energy <= 0))
348               OR reserved_energy < 0
349               OR settled_energy < 0
350            LIMIT 1
351            "#,
352        )
353        .fetch_optional(&self.pool)
354        .await?;
355        if let Some(row) = invalid_energy_event {
356            let event_id: String = row.get("id");
357            return Err(KernelError::PolicyViolation(format!(
358                "energy integrity violation in event {event_id}"
359            )));
360        }
361
362        self.validate_audit_coverage().await?;
363
364        info!("startup integrity checks passed");
365        Ok(())
366    }
367
368    /// Verify audit log coverage — the latest checkpoint's tree_size must
369    /// equal the total event count, meaning every committed event is captured
370    /// in the Merkle tree. Emits a warning on gap rather than refusing to start.
371    async fn validate_audit_coverage(&self) -> KernelResult<()> {
372        let event_count: i64 = sqlx::query("SELECT COUNT(*) AS cnt FROM events")
373            .fetch_one(&self.pool)
374            .await?
375            .get("cnt");
376
377        if event_count == 0 {
378            return Ok(());
379        }
380
381        let checkpoint_tree_size: Option<i64> =
382            sqlx::query("SELECT tree_size FROM audit_checkpoints ORDER BY tree_size DESC LIMIT 1")
383                .fetch_optional(&self.pool)
384                .await?
385                .map(|r| r.get("tree_size"));
386
387        match checkpoint_tree_size {
388            None => {
389                warn!(
390                    event_count,
391                    "audit coverage gap: {} event(s) committed but no audit checkpoint exists",
392                    event_count
393                );
394            }
395            Some(tree_size) if tree_size < event_count => {
396                warn!(
397                    event_count,
398                    checkpoint_tree_size = tree_size,
399                    "audit coverage gap: {} event(s) not yet covered by Merkle checkpoint",
400                    event_count - tree_size
401                );
402            }
403            _ => {}
404        }
405
406        Ok(())
407    }
408
409    /// Records a governance policy version change in system_meta.
410    /// Called by the kernel whenever a `system/policy` Create event is committed.
411    pub async fn set_policy_version(&self, version: &str) -> KernelResult<()> {
412        self.set_meta("policy_version", version).await
413    }
414
415    async fn set_meta(&self, key: &str, value: &str) -> KernelResult<()> {
416        sqlx::query(
417            r#"
418            INSERT INTO system_meta (key, value)
419            VALUES (?1, ?2)
420            ON CONFLICT(key) DO UPDATE SET value = excluded.value
421            "#,
422        )
423        .bind(key)
424        .bind(value)
425        .execute(&self.pool)
426        .await?;
427        Ok(())
428    }
429}
430
431fn now_millis_string() -> String {
432    let now = std::time::SystemTime::now()
433        .duration_since(std::time::UNIX_EPOCH)
434        .unwrap_or_default();
435    now.as_millis().to_string()
436}