Skip to main content

punkgo_kernel/state/
store.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use punkgo_core::errors::{KernelError, KernelResult};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Row, SqlitePool};
7use tracing::{info, warn};
8
9#[derive(Clone)]
10pub struct StatePaths {
11    pub root: PathBuf,
12    pub workspace_root: PathBuf,
13    pub quarantine_root: PathBuf,
14    pub blobs_root: PathBuf,
15    pub db_path: PathBuf,
16}
17
18pub struct StateStore {
19    pool: SqlitePool,
20    paths: StatePaths,
21}
22
23impl StateStore {
24    pub async fn bootstrap(root: impl AsRef<Path>) -> KernelResult<Self> {
25        let root = root.as_ref().to_path_buf();
26        let workspace_root = root.join("workspaces");
27        let quarantine_root = root.join("quarantine");
28        let event_log_root = root.join("event_log");
29        let blobs_root = root.join("blobs");
30
31        fs::create_dir_all(&workspace_root)?;
32        fs::create_dir_all(&quarantine_root)?;
33        fs::create_dir_all(&event_log_root)?;
34        fs::create_dir_all(&blobs_root)?;
35
36        // No file-based lock — SQLite WAL handles concurrent access, and the
37        // IPC endpoint binding prevents duplicate daemon processes. The old
38        // kernel.lock approach left stale lock files after crashes.
39
40        let db_path = root.join("punkgo.db");
41        let connect_options = SqliteConnectOptions::new()
42            .filename(&db_path)
43            .create_if_missing(true)
44            .journal_mode(SqliteJournalMode::Wal)
45            .synchronous(SqliteSynchronous::Full)
46            .busy_timeout(std::time::Duration::from_secs(5));
47
48        let max_conn = std::thread::available_parallelism()
49            .map(|n| n.get() as u32)
50            .unwrap_or(8)
51            .max(8);
52
53        let pool = SqlitePoolOptions::new()
54            .max_connections(max_conn)
55            .connect_with(connect_options)
56            .await?;
57
58        Self::init_schema(&pool).await?;
59        Self::seed_defaults(&pool).await?;
60
61        let store = Self {
62            pool,
63            paths: StatePaths {
64                root,
65                workspace_root,
66                quarantine_root,
67                blobs_root,
68                db_path,
69            },
70        };
71
72        info!(state_root = %store.paths.root.display(), "state store bootstrapped");
73        store.run_startup_integrity_checks().await?;
74        Ok(store)
75    }
76
77    pub fn pool(&self) -> SqlitePool {
78        self.pool.clone()
79    }
80
81    pub fn paths(&self) -> &StatePaths {
82        &self.paths
83    }
84
85    pub async fn actor_exists(&self, actor_id: &str) -> KernelResult<bool> {
86        let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
87            .bind(actor_id)
88            .fetch_optional(&self.pool)
89            .await?;
90        Ok(row.is_some())
91    }
92
93    async fn init_schema(pool: &SqlitePool) -> KernelResult<()> {
94        sqlx::query(
95            r#"
96            CREATE TABLE IF NOT EXISTS events (
97                id TEXT PRIMARY KEY,
98                log_index INTEGER NOT NULL UNIQUE,
99                event_hash TEXT NOT NULL,
100                actor_id TEXT NOT NULL,
101                action_type TEXT NOT NULL,
102                target TEXT NOT NULL,
103                payload TEXT NOT NULL,
104                payload_hash TEXT NOT NULL,
105                artifact_hash TEXT,
106                reserved_energy INTEGER NOT NULL,
107                settled_energy INTEGER NOT NULL,
108                timestamp TEXT NOT NULL
109            )
110            "#,
111        )
112        .execute(pool)
113        .await?;
114
115        sqlx::query(
116            r#"
117            CREATE TABLE IF NOT EXISTS energy_ledger (
118                actor_id TEXT PRIMARY KEY,
119                energy_balance INTEGER NOT NULL,
120                reserved_energy INTEGER NOT NULL DEFAULT 0,
121                updated_at TEXT NOT NULL
122            )
123            "#,
124        )
125        .execute(pool)
126        .await?;
127
128        sqlx::query(
129            r#"
130            CREATE TABLE IF NOT EXISTS system_meta (
131                key TEXT PRIMARY KEY,
132                value TEXT NOT NULL
133            )
134            "#,
135        )
136        .execute(pool)
137        .await?;
138
139        sqlx::query(
140            r#"
141            CREATE TABLE IF NOT EXISTS audit_hashes (
142                hash_index INTEGER NOT NULL PRIMARY KEY,
143                hash BLOB NOT NULL
144            )
145            "#,
146        )
147        .execute(pool)
148        .await?;
149
150        sqlx::query(
151            r#"
152            CREATE TABLE IF NOT EXISTS audit_checkpoints (
153                tree_size INTEGER NOT NULL PRIMARY KEY,
154                root_hash TEXT NOT NULL,
155                checkpoint_text TEXT NOT NULL,
156                created_at TEXT NOT NULL
157            )
158            "#,
159        )
160        .execute(pool)
161        .await?;
162
163        // Phase 1: actors table — explicit actor records with type, lineage, boundary.
164        sqlx::query(
165            r#"
166            CREATE TABLE IF NOT EXISTS actors (
167                actor_id         TEXT PRIMARY KEY,
168                actor_type       TEXT NOT NULL CHECK(actor_type IN ('human', 'agent')),
169                creator_id       TEXT,
170                lineage          TEXT NOT NULL DEFAULT '[]',
171                purpose          TEXT,
172                status           TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'frozen')),
173                writable_targets TEXT NOT NULL DEFAULT '[]',
174                energy_share     REAL NOT NULL DEFAULT 0.0,
175                reduction_policy TEXT NOT NULL DEFAULT 'none',
176                created_at       TEXT NOT NULL,
177                updated_at       TEXT NOT NULL
178            )
179            "#,
180        )
181        .execute(pool)
182        .await?;
183
184        // Phase 4b: envelopes table — budget envelope authorization system (PIP-001 §11).
185        // PIP-001 §11a: hold_on, hold_timeout_secs are part of the base schema.
186        sqlx::query(
187            r#"
188            CREATE TABLE IF NOT EXISTS envelopes (
189                envelope_id        TEXT PRIMARY KEY,
190                actor_id           TEXT NOT NULL,
191                grantor_id         TEXT NOT NULL,
192                parent_envelope_id TEXT,
193                budget             INTEGER NOT NULL,
194                budget_consumed    INTEGER NOT NULL DEFAULT 0,
195                targets            TEXT NOT NULL,
196                actions            TEXT NOT NULL,
197                duration_secs      INTEGER,
198                report_every       INTEGER,
199                hold_on            TEXT NOT NULL DEFAULT '[]',
200                hold_timeout_secs  INTEGER,
201                status             TEXT NOT NULL DEFAULT 'active'
202                                   CHECK(status IN ('active','expired','revoked')),
203                last_report_at     INTEGER NOT NULL DEFAULT 0,
204                created_at         TEXT NOT NULL,
205                expires_at         TEXT,
206                updated_at         TEXT NOT NULL
207            )
208            "#,
209        )
210        .execute(pool)
211        .await?;
212
213        // PIP-001 §11b: hold_requests table — tracks hold events and pending actions.
214        sqlx::query(
215            r#"
216            CREATE TABLE IF NOT EXISTS hold_requests (
217                hold_id          TEXT PRIMARY KEY,
218                envelope_id      TEXT NOT NULL,
219                agent_id         TEXT NOT NULL,
220                trigger_target   TEXT NOT NULL,
221                trigger_action   TEXT NOT NULL,
222                pending_payload  TEXT NOT NULL,
223                status           TEXT NOT NULL DEFAULT 'pending'
224                                 CHECK(status IN ('pending','approved','rejected','timed_out')),
225                decision         TEXT,
226                instruction      TEXT,
227                triggered_at     TEXT NOT NULL,
228                resolved_at      TEXT,
229                FOREIGN KEY (envelope_id) REFERENCES envelopes(envelope_id)
230            )
231            "#,
232        )
233        .execute(pool)
234        .await?;
235
236        Ok(())
237    }
238
239    async fn seed_defaults(pool: &SqlitePool) -> KernelResult<()> {
240        let now = now_millis_string();
241        sqlx::query(
242            r#"
243            INSERT INTO system_meta (key, value)
244            VALUES ('schema_version', 'v1')
245            ON CONFLICT(key) DO NOTHING
246            "#,
247        )
248        .execute(pool)
249        .await?;
250
251        sqlx::query(
252            r#"
253            INSERT INTO energy_ledger (actor_id, energy_balance, reserved_energy, updated_at)
254            VALUES ('root', 1000000, 0, ?1)
255            ON CONFLICT(actor_id) DO NOTHING
256            "#,
257        )
258        .bind(&now)
259        .execute(pool)
260        .await?;
261
262        // Phase 1: seed root as human actor with full wildcard boundary (PIP-001 §10).
263        // Root is a genesis actor — it receives a one-time energy balance but does not
264        // participate in tick-based energy distribution (energy_share = 0.0).
265        // Only agents receive ongoing energy production.
266        sqlx::query(
267            r#"
268            INSERT INTO actors (
269                actor_id, actor_type, creator_id, lineage, purpose, status,
270                writable_targets, energy_share, reduction_policy, created_at, updated_at
271            )
272            VALUES ('root', 'human', NULL, '[]', NULL, 'active',
273                    '[{"target":"**","actions":["create","mutate","execute"]}]',
274                    0.0, 'none', ?1, ?1)
275            ON CONFLICT(actor_id) DO NOTHING
276            "#,
277        )
278        .bind(&now)
279        .execute(pool)
280        .await?;
281
282        Ok(())
283    }
284
285    pub async fn run_startup_integrity_checks(&self) -> KernelResult<()> {
286        info!("running startup integrity checks");
287        let violation = sqlx::query(
288            r#"
289            SELECT actor_id, energy_balance, reserved_energy
290            FROM energy_ledger
291            WHERE reserved_energy < 0
292               OR energy_balance < 0
293               OR reserved_energy > energy_balance
294            LIMIT 1
295            "#,
296        )
297        .fetch_optional(&self.pool)
298        .await?;
299
300        if let Some(row) = violation {
301            let actor_id: String = row.get("actor_id");
302            return Err(KernelError::PolicyViolation(format!(
303                "energy ledger consistency violation for actor {actor_id}"
304            )));
305        }
306
307        let invalid_action = sqlx::query(
308            r#"
309            SELECT action_type
310            FROM events
311            WHERE action_type NOT IN (
312                'observe', 'create', 'mutate', 'execute',
313                'circuit_breaker',
314                'hold_request', 'hold_response', 'hold_timeout'
315            )
316            LIMIT 1
317            "#,
318        )
319        .fetch_optional(&self.pool)
320        .await?;
321        if let Some(row) = invalid_action {
322            let action_type: String = row.get("action_type");
323            return Err(KernelError::PolicyViolation(format!(
324                "action integrity violation: unsupported action_type={action_type}"
325            )));
326        }
327
328        let invalid_energy_event = sqlx::query(
329            r#"
330            SELECT id, log_index, action_type, reserved_energy, settled_energy
331            FROM events
332            WHERE (action_type IN ('mutate', 'execute') AND (reserved_energy <= 0 OR settled_energy <= 0))
333               OR reserved_energy < 0
334               OR settled_energy < 0
335            LIMIT 1
336            "#,
337        )
338        .fetch_optional(&self.pool)
339        .await?;
340        if let Some(row) = invalid_energy_event {
341            let event_id: String = row.get("id");
342            return Err(KernelError::PolicyViolation(format!(
343                "energy integrity violation in event {event_id}"
344            )));
345        }
346
347        self.validate_audit_coverage().await?;
348
349        info!("startup integrity checks passed");
350        Ok(())
351    }
352
353    /// Verify audit log coverage — the latest checkpoint's tree_size must
354    /// equal the total event count, meaning every committed event is captured
355    /// in the Merkle tree. Emits a warning on gap rather than refusing to start.
356    async fn validate_audit_coverage(&self) -> KernelResult<()> {
357        let event_count: i64 = sqlx::query("SELECT COUNT(*) AS cnt FROM events")
358            .fetch_one(&self.pool)
359            .await?
360            .get("cnt");
361
362        if event_count == 0 {
363            return Ok(());
364        }
365
366        let checkpoint_tree_size: Option<i64> =
367            sqlx::query("SELECT tree_size FROM audit_checkpoints ORDER BY tree_size DESC LIMIT 1")
368                .fetch_optional(&self.pool)
369                .await?
370                .map(|r| r.get("tree_size"));
371
372        match checkpoint_tree_size {
373            None => {
374                warn!(
375                    event_count,
376                    "audit coverage gap: {} event(s) committed but no audit checkpoint exists",
377                    event_count
378                );
379            }
380            Some(tree_size) if tree_size < event_count => {
381                warn!(
382                    event_count,
383                    checkpoint_tree_size = tree_size,
384                    "audit coverage gap: {} event(s) not yet covered by Merkle checkpoint",
385                    event_count - tree_size
386                );
387            }
388            _ => {}
389        }
390
391        Ok(())
392    }
393
394    /// Records a governance policy version change in system_meta.
395    /// Called by the kernel whenever a `system/policy` Create event is committed.
396    pub async fn set_policy_version(&self, version: &str) -> KernelResult<()> {
397        self.set_meta("policy_version", version).await
398    }
399
400    async fn set_meta(&self, key: &str, value: &str) -> KernelResult<()> {
401        sqlx::query(
402            r#"
403            INSERT INTO system_meta (key, value)
404            VALUES (?1, ?2)
405            ON CONFLICT(key) DO UPDATE SET value = excluded.value
406            "#,
407        )
408        .bind(key)
409        .bind(value)
410        .execute(&self.pool)
411        .await?;
412        Ok(())
413    }
414}
415
416fn now_millis_string() -> String {
417    let now = std::time::SystemTime::now()
418        .duration_since(std::time::UNIX_EPOCH)
419        .unwrap_or_default();
420    now.as_millis().to_string()
421}