Skip to main content

punkgo_kernel/state/
store.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use punkgo_core::errors::{KernelError, KernelResult};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Row, SqlitePool};
7use tracing::{info, warn};
8
9#[derive(Clone)]
10pub struct StatePaths {
11    pub root: PathBuf,
12    pub workspace_root: PathBuf,
13    pub quarantine_root: PathBuf,
14    pub blobs_root: PathBuf,
15    pub db_path: PathBuf,
16}
17
18pub struct StateStore {
19    pool: SqlitePool,
20    paths: StatePaths,
21}
22
23impl StateStore {
24    pub async fn bootstrap(root: impl AsRef<Path>) -> KernelResult<Self> {
25        let root = root.as_ref().to_path_buf();
26        let workspace_root = root.join("workspaces");
27        let quarantine_root = root.join("quarantine");
28        let event_log_root = root.join("event_log");
29        let blobs_root = root.join("blobs");
30
31        fs::create_dir_all(&workspace_root)?;
32        fs::create_dir_all(&quarantine_root)?;
33        fs::create_dir_all(&event_log_root)?;
34        fs::create_dir_all(&blobs_root)?;
35
36        // No file-based lock — SQLite WAL handles concurrent access, and the
37        // IPC endpoint binding prevents duplicate daemon processes. The old
38        // kernel.lock approach left stale lock files after crashes.
39
40        let db_path = root.join("punkgo.db");
41        let connect_options = SqliteConnectOptions::new()
42            .filename(&db_path)
43            .create_if_missing(true)
44            .journal_mode(SqliteJournalMode::Wal)
45            .synchronous(SqliteSynchronous::Full)
46            .busy_timeout(std::time::Duration::from_secs(5));
47
48        let max_conn = std::thread::available_parallelism()
49            .map(|n| n.get() as u32)
50            .unwrap_or(8)
51            .max(8);
52
53        let pool = SqlitePoolOptions::new()
54            .max_connections(max_conn)
55            .connect_with(connect_options)
56            .await?;
57
58        Self::init_schema(&pool).await?;
59        Self::seed_defaults(&pool).await?;
60
61        let store = Self {
62            pool,
63            paths: StatePaths {
64                root,
65                workspace_root,
66                quarantine_root,
67                blobs_root,
68                db_path,
69            },
70        };
71
72        info!(state_root = %store.paths.root.display(), "state store bootstrapped");
73        store.run_startup_integrity_checks().await?;
74        Ok(store)
75    }
76
77    pub fn pool(&self) -> SqlitePool {
78        self.pool.clone()
79    }
80
81    pub fn paths(&self) -> &StatePaths {
82        &self.paths
83    }
84
85    pub async fn actor_exists(&self, actor_id: &str) -> KernelResult<bool> {
86        let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
87            .bind(actor_id)
88            .fetch_optional(&self.pool)
89            .await?;
90        Ok(row.is_some())
91    }
92
93    async fn init_schema(pool: &SqlitePool) -> KernelResult<()> {
94        sqlx::query(
95            r#"
96            CREATE TABLE IF NOT EXISTS events (
97                id TEXT PRIMARY KEY,
98                log_index INTEGER NOT NULL UNIQUE,
99                event_hash TEXT NOT NULL,
100                actor_id TEXT NOT NULL,
101                action_type TEXT NOT NULL,
102                target TEXT NOT NULL,
103                payload TEXT NOT NULL,
104                payload_hash TEXT NOT NULL,
105                artifact_hash TEXT,
106                reserved_energy INTEGER NOT NULL,
107                settled_energy INTEGER NOT NULL,
108                timestamp TEXT NOT NULL
109            )
110            "#,
111        )
112        .execute(pool)
113        .await?;
114
115        sqlx::query(
116            r#"
117            CREATE TABLE IF NOT EXISTS energy_ledger (
118                actor_id TEXT PRIMARY KEY,
119                energy_balance INTEGER NOT NULL,
120                reserved_energy INTEGER NOT NULL DEFAULT 0,
121                updated_at TEXT NOT NULL
122            )
123            "#,
124        )
125        .execute(pool)
126        .await?;
127
128        sqlx::query(
129            r#"
130            CREATE TABLE IF NOT EXISTS system_meta (
131                key TEXT PRIMARY KEY,
132                value TEXT NOT NULL
133            )
134            "#,
135        )
136        .execute(pool)
137        .await?;
138
139        sqlx::query(
140            r#"
141            CREATE TABLE IF NOT EXISTS audit_hashes (
142                hash_index INTEGER NOT NULL PRIMARY KEY,
143                hash BLOB NOT NULL
144            )
145            "#,
146        )
147        .execute(pool)
148        .await?;
149
150        sqlx::query(
151            r#"
152            CREATE TABLE IF NOT EXISTS audit_checkpoints (
153                tree_size INTEGER NOT NULL PRIMARY KEY,
154                root_hash TEXT NOT NULL,
155                checkpoint_text TEXT NOT NULL,
156                created_at TEXT NOT NULL
157            )
158            "#,
159        )
160        .execute(pool)
161        .await?;
162
163        // Phase 1: actors table — explicit actor records with type, lineage, boundary.
164        sqlx::query(
165            r#"
166            CREATE TABLE IF NOT EXISTS actors (
167                actor_id         TEXT PRIMARY KEY,
168                actor_type       TEXT NOT NULL CHECK(actor_type IN ('human', 'agent')),
169                creator_id       TEXT,
170                lineage          TEXT NOT NULL DEFAULT '[]',
171                purpose          TEXT,
172                status           TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'frozen')),
173                writable_targets TEXT NOT NULL DEFAULT '[]',
174                energy_share     REAL NOT NULL DEFAULT 0.0,
175                reduction_policy TEXT NOT NULL DEFAULT 'none',
176                created_at       TEXT NOT NULL,
177                updated_at       TEXT NOT NULL
178            )
179            "#,
180        )
181        .execute(pool)
182        .await?;
183
184        // Phase 4b: envelopes table — budget envelope authorization system (PIP-001 §11).
185        // PIP-001 §11a: hold_on, hold_timeout_secs are part of the base schema.
186        sqlx::query(
187            r#"
188            CREATE TABLE IF NOT EXISTS envelopes (
189                envelope_id        TEXT PRIMARY KEY,
190                actor_id           TEXT NOT NULL,
191                grantor_id         TEXT NOT NULL,
192                parent_envelope_id TEXT,
193                budget             INTEGER NOT NULL,
194                budget_consumed    INTEGER NOT NULL DEFAULT 0,
195                targets            TEXT NOT NULL,
196                actions            TEXT NOT NULL,
197                duration_secs      INTEGER,
198                report_every       INTEGER,
199                hold_on            TEXT NOT NULL DEFAULT '[]',
200                hold_timeout_secs  INTEGER,
201                status             TEXT NOT NULL DEFAULT 'active'
202                                   CHECK(status IN ('active','expired','revoked')),
203                last_report_at     INTEGER NOT NULL DEFAULT 0,
204                created_at         TEXT NOT NULL,
205                expires_at         TEXT,
206                updated_at         TEXT NOT NULL
207            )
208            "#,
209        )
210        .execute(pool)
211        .await?;
212
213        // PIP-001 §11b: hold_requests table — tracks hold events and pending actions.
214        sqlx::query(
215            r#"
216            CREATE TABLE IF NOT EXISTS hold_requests (
217                hold_id          TEXT PRIMARY KEY,
218                envelope_id      TEXT NOT NULL,
219                agent_id         TEXT NOT NULL,
220                trigger_target   TEXT NOT NULL,
221                trigger_action   TEXT NOT NULL,
222                pending_payload  TEXT NOT NULL,
223                status           TEXT NOT NULL DEFAULT 'pending'
224                                 CHECK(status IN ('pending','approved','rejected','timed_out')),
225                decision         TEXT,
226                instruction      TEXT,
227                triggered_at     TEXT NOT NULL,
228                resolved_at      TEXT,
229                FOREIGN KEY (envelope_id) REFERENCES envelopes(envelope_id)
230            )
231            "#,
232        )
233        .execute(pool)
234        .await?;
235
236        Ok(())
237    }
238
239    async fn seed_defaults(pool: &SqlitePool) -> KernelResult<()> {
240        let now = now_millis_string();
241        sqlx::query(
242            r#"
243            INSERT INTO system_meta (key, value)
244            VALUES ('schema_version', 'v1')
245            ON CONFLICT(key) DO NOTHING
246            "#,
247        )
248        .execute(pool)
249        .await?;
250
251        sqlx::query(
252            r#"
253            INSERT INTO energy_ledger (actor_id, energy_balance, reserved_energy, updated_at)
254            VALUES ('root', 1000000, 0, ?1)
255            ON CONFLICT(actor_id) DO NOTHING
256            "#,
257        )
258        .bind(&now)
259        .execute(pool)
260        .await?;
261
262        // Phase 1: seed root as human actor with full wildcard boundary (PIP-001 §10).
263        sqlx::query(
264            r#"
265            INSERT INTO actors (
266                actor_id, actor_type, creator_id, lineage, purpose, status,
267                writable_targets, energy_share, reduction_policy, created_at, updated_at
268            )
269            VALUES ('root', 'human', NULL, '[]', NULL, 'active',
270                    '[{"target":"**","actions":["create","mutate","execute"]}]',
271                    100.0, 'none', ?1, ?1)
272            ON CONFLICT(actor_id) DO NOTHING
273            "#,
274        )
275        .bind(&now)
276        .execute(pool)
277        .await?;
278
279        Ok(())
280    }
281
282    pub async fn run_startup_integrity_checks(&self) -> KernelResult<()> {
283        info!("running startup integrity checks");
284        let violation = sqlx::query(
285            r#"
286            SELECT actor_id, energy_balance, reserved_energy
287            FROM energy_ledger
288            WHERE reserved_energy < 0
289               OR energy_balance < 0
290               OR reserved_energy > energy_balance
291            LIMIT 1
292            "#,
293        )
294        .fetch_optional(&self.pool)
295        .await?;
296
297        if let Some(row) = violation {
298            let actor_id: String = row.get("actor_id");
299            return Err(KernelError::PolicyViolation(format!(
300                "energy ledger consistency violation for actor {actor_id}"
301            )));
302        }
303
304        let invalid_action = sqlx::query(
305            r#"
306            SELECT action_type
307            FROM events
308            WHERE action_type NOT IN (
309                'observe', 'create', 'mutate', 'execute',
310                'circuit_breaker',
311                'hold_request', 'hold_response', 'hold_timeout'
312            )
313            LIMIT 1
314            "#,
315        )
316        .fetch_optional(&self.pool)
317        .await?;
318        if let Some(row) = invalid_action {
319            let action_type: String = row.get("action_type");
320            return Err(KernelError::PolicyViolation(format!(
321                "action integrity violation: unsupported action_type={action_type}"
322            )));
323        }
324
325        let invalid_energy_event = sqlx::query(
326            r#"
327            SELECT id, log_index, action_type, reserved_energy, settled_energy
328            FROM events
329            WHERE (action_type IN ('mutate', 'execute') AND (reserved_energy <= 0 OR settled_energy <= 0))
330               OR reserved_energy < 0
331               OR settled_energy < 0
332            LIMIT 1
333            "#,
334        )
335        .fetch_optional(&self.pool)
336        .await?;
337        if let Some(row) = invalid_energy_event {
338            let event_id: String = row.get("id");
339            return Err(KernelError::PolicyViolation(format!(
340                "energy integrity violation in event {event_id}"
341            )));
342        }
343
344        self.validate_audit_coverage().await?;
345
346        info!("startup integrity checks passed");
347        Ok(())
348    }
349
350    /// Verify audit log coverage — the latest checkpoint's tree_size must
351    /// equal the total event count, meaning every committed event is captured
352    /// in the Merkle tree. Emits a warning on gap rather than refusing to start.
353    async fn validate_audit_coverage(&self) -> KernelResult<()> {
354        let event_count: i64 = sqlx::query("SELECT COUNT(*) AS cnt FROM events")
355            .fetch_one(&self.pool)
356            .await?
357            .get("cnt");
358
359        if event_count == 0 {
360            return Ok(());
361        }
362
363        let checkpoint_tree_size: Option<i64> =
364            sqlx::query("SELECT tree_size FROM audit_checkpoints ORDER BY tree_size DESC LIMIT 1")
365                .fetch_optional(&self.pool)
366                .await?
367                .map(|r| r.get("tree_size"));
368
369        match checkpoint_tree_size {
370            None => {
371                warn!(
372                    event_count,
373                    "audit coverage gap: {} event(s) committed but no audit checkpoint exists",
374                    event_count
375                );
376            }
377            Some(tree_size) if tree_size < event_count => {
378                warn!(
379                    event_count,
380                    checkpoint_tree_size = tree_size,
381                    "audit coverage gap: {} event(s) not yet covered by Merkle checkpoint",
382                    event_count - tree_size
383                );
384            }
385            _ => {}
386        }
387
388        Ok(())
389    }
390
391    /// Records a governance policy version change in system_meta.
392    /// Called by the kernel whenever a `system/policy` Create event is committed.
393    pub async fn set_policy_version(&self, version: &str) -> KernelResult<()> {
394        self.set_meta("policy_version", version).await
395    }
396
397    async fn set_meta(&self, key: &str, value: &str) -> KernelResult<()> {
398        sqlx::query(
399            r#"
400            INSERT INTO system_meta (key, value)
401            VALUES (?1, ?2)
402            ON CONFLICT(key) DO UPDATE SET value = excluded.value
403            "#,
404        )
405        .bind(key)
406        .bind(value)
407        .execute(&self.pool)
408        .await?;
409        Ok(())
410    }
411}
412
413fn now_millis_string() -> String {
414    let now = std::time::SystemTime::now()
415        .duration_since(std::time::UNIX_EPOCH)
416        .unwrap_or_default();
417    now.as_millis().to_string()
418}