1use std::fs;
2use std::path::{Path, PathBuf};
3
4use punkgo_core::errors::{KernelError, KernelResult};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Row, SqlitePool};
7use tracing::{info, warn};
8
9#[derive(Clone)]
10pub struct StatePaths {
11 pub root: PathBuf,
12 pub workspace_root: PathBuf,
13 pub quarantine_root: PathBuf,
14 pub blobs_root: PathBuf,
15 pub db_path: PathBuf,
16}
17
18pub struct StateStore {
19 pool: SqlitePool,
20 paths: StatePaths,
21}
22
23impl StateStore {
24 pub async fn bootstrap(root: impl AsRef<Path>) -> KernelResult<Self> {
25 let root = root.as_ref().to_path_buf();
26 let workspace_root = root.join("workspaces");
27 let quarantine_root = root.join("quarantine");
28 let event_log_root = root.join("event_log");
29 let blobs_root = root.join("blobs");
30
31 fs::create_dir_all(&workspace_root)?;
32 fs::create_dir_all(&quarantine_root)?;
33 fs::create_dir_all(&event_log_root)?;
34 fs::create_dir_all(&blobs_root)?;
35
36 let db_path = root.join("punkgo.db");
41 let connect_options = SqliteConnectOptions::new()
42 .filename(&db_path)
43 .create_if_missing(true)
44 .journal_mode(SqliteJournalMode::Wal)
45 .synchronous(SqliteSynchronous::Full)
46 .busy_timeout(std::time::Duration::from_secs(5));
47
48 let max_conn = std::thread::available_parallelism()
49 .map(|n| n.get() as u32)
50 .unwrap_or(8)
51 .max(8);
52
53 let pool = SqlitePoolOptions::new()
54 .max_connections(max_conn)
55 .connect_with(connect_options)
56 .await?;
57
58 Self::init_schema(&pool).await?;
59 Self::seed_defaults(&pool).await?;
60
61 let store = Self {
62 pool,
63 paths: StatePaths {
64 root,
65 workspace_root,
66 quarantine_root,
67 blobs_root,
68 db_path,
69 },
70 };
71
72 info!(state_root = %store.paths.root.display(), "state store bootstrapped");
73 store.run_startup_integrity_checks().await?;
74 Ok(store)
75 }
76
77 pub fn pool(&self) -> SqlitePool {
78 self.pool.clone()
79 }
80
81 pub fn paths(&self) -> &StatePaths {
82 &self.paths
83 }
84
85 pub async fn actor_exists(&self, actor_id: &str) -> KernelResult<bool> {
86 let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
87 .bind(actor_id)
88 .fetch_optional(&self.pool)
89 .await?;
90 Ok(row.is_some())
91 }
92
93 async fn init_schema(pool: &SqlitePool) -> KernelResult<()> {
94 sqlx::query(
95 r#"
96 CREATE TABLE IF NOT EXISTS events (
97 id TEXT PRIMARY KEY,
98 log_index INTEGER NOT NULL UNIQUE,
99 event_hash TEXT NOT NULL,
100 actor_id TEXT NOT NULL,
101 action_type TEXT NOT NULL,
102 target TEXT NOT NULL,
103 payload TEXT NOT NULL,
104 payload_hash TEXT NOT NULL,
105 artifact_hash TEXT,
106 reserved_energy INTEGER NOT NULL,
107 settled_energy INTEGER NOT NULL,
108 timestamp TEXT NOT NULL
109 )
110 "#,
111 )
112 .execute(pool)
113 .await?;
114
115 sqlx::query(
116 r#"
117 CREATE TABLE IF NOT EXISTS energy_ledger (
118 actor_id TEXT PRIMARY KEY,
119 energy_balance INTEGER NOT NULL,
120 reserved_energy INTEGER NOT NULL DEFAULT 0,
121 updated_at TEXT NOT NULL
122 )
123 "#,
124 )
125 .execute(pool)
126 .await?;
127
128 sqlx::query(
129 r#"
130 CREATE TABLE IF NOT EXISTS system_meta (
131 key TEXT PRIMARY KEY,
132 value TEXT NOT NULL
133 )
134 "#,
135 )
136 .execute(pool)
137 .await?;
138
139 sqlx::query(
140 r#"
141 CREATE TABLE IF NOT EXISTS audit_hashes (
142 hash_index INTEGER NOT NULL PRIMARY KEY,
143 hash BLOB NOT NULL
144 )
145 "#,
146 )
147 .execute(pool)
148 .await?;
149
150 sqlx::query(
151 r#"
152 CREATE TABLE IF NOT EXISTS audit_checkpoints (
153 tree_size INTEGER NOT NULL PRIMARY KEY,
154 root_hash TEXT NOT NULL,
155 checkpoint_text TEXT NOT NULL,
156 created_at TEXT NOT NULL
157 )
158 "#,
159 )
160 .execute(pool)
161 .await?;
162
163 sqlx::query(
165 r#"
166 CREATE TABLE IF NOT EXISTS actors (
167 actor_id TEXT PRIMARY KEY,
168 actor_type TEXT NOT NULL CHECK(actor_type IN ('human', 'agent')),
169 creator_id TEXT,
170 lineage TEXT NOT NULL DEFAULT '[]',
171 purpose TEXT,
172 status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'frozen')),
173 writable_targets TEXT NOT NULL DEFAULT '[]',
174 energy_share REAL NOT NULL DEFAULT 0.0,
175 reduction_policy TEXT NOT NULL DEFAULT 'none',
176 created_at TEXT NOT NULL,
177 updated_at TEXT NOT NULL
178 )
179 "#,
180 )
181 .execute(pool)
182 .await?;
183
184 sqlx::query(
187 r#"
188 CREATE TABLE IF NOT EXISTS envelopes (
189 envelope_id TEXT PRIMARY KEY,
190 actor_id TEXT NOT NULL,
191 grantor_id TEXT NOT NULL,
192 parent_envelope_id TEXT,
193 budget INTEGER NOT NULL,
194 budget_consumed INTEGER NOT NULL DEFAULT 0,
195 targets TEXT NOT NULL,
196 actions TEXT NOT NULL,
197 duration_secs INTEGER,
198 report_every INTEGER,
199 hold_on TEXT NOT NULL DEFAULT '[]',
200 hold_timeout_secs INTEGER,
201 status TEXT NOT NULL DEFAULT 'active'
202 CHECK(status IN ('active','expired','revoked')),
203 last_report_at INTEGER NOT NULL DEFAULT 0,
204 created_at TEXT NOT NULL,
205 expires_at TEXT,
206 updated_at TEXT NOT NULL
207 )
208 "#,
209 )
210 .execute(pool)
211 .await?;
212
213 sqlx::query(
215 r#"
216 CREATE TABLE IF NOT EXISTS hold_requests (
217 hold_id TEXT PRIMARY KEY,
218 envelope_id TEXT NOT NULL,
219 agent_id TEXT NOT NULL,
220 trigger_target TEXT NOT NULL,
221 trigger_action TEXT NOT NULL,
222 pending_payload TEXT NOT NULL,
223 status TEXT NOT NULL DEFAULT 'pending'
224 CHECK(status IN ('pending','approved','rejected','timed_out')),
225 decision TEXT,
226 instruction TEXT,
227 triggered_at TEXT NOT NULL,
228 resolved_at TEXT,
229 FOREIGN KEY (envelope_id) REFERENCES envelopes(envelope_id)
230 )
231 "#,
232 )
233 .execute(pool)
234 .await?;
235
236 Ok(())
237 }
238
239 async fn seed_defaults(pool: &SqlitePool) -> KernelResult<()> {
240 let now = now_millis_string();
241 sqlx::query(
242 r#"
243 INSERT INTO system_meta (key, value)
244 VALUES ('schema_version', 'v1')
245 ON CONFLICT(key) DO NOTHING
246 "#,
247 )
248 .execute(pool)
249 .await?;
250
251 sqlx::query(
252 r#"
253 INSERT INTO energy_ledger (actor_id, energy_balance, reserved_energy, updated_at)
254 VALUES ('root', 1000000, 0, ?1)
255 ON CONFLICT(actor_id) DO NOTHING
256 "#,
257 )
258 .bind(&now)
259 .execute(pool)
260 .await?;
261
262 sqlx::query(
267 r#"
268 INSERT INTO actors (
269 actor_id, actor_type, creator_id, lineage, purpose, status,
270 writable_targets, energy_share, reduction_policy, created_at, updated_at
271 )
272 VALUES ('root', 'human', NULL, '[]', NULL, 'active',
273 '[{"target":"**","actions":["create","mutate","execute"]}]',
274 0.0, 'none', ?1, ?1)
275 ON CONFLICT(actor_id) DO NOTHING
276 "#,
277 )
278 .bind(&now)
279 .execute(pool)
280 .await?;
281
282 Ok(())
283 }
284
285 pub async fn run_startup_integrity_checks(&self) -> KernelResult<()> {
286 info!("running startup integrity checks");
287 let violation = sqlx::query(
288 r#"
289 SELECT actor_id, energy_balance, reserved_energy
290 FROM energy_ledger
291 WHERE reserved_energy < 0
292 OR energy_balance < 0
293 OR reserved_energy > energy_balance
294 LIMIT 1
295 "#,
296 )
297 .fetch_optional(&self.pool)
298 .await?;
299
300 if let Some(row) = violation {
301 let actor_id: String = row.get("actor_id");
302 return Err(KernelError::PolicyViolation(format!(
303 "energy ledger consistency violation for actor {actor_id}"
304 )));
305 }
306
307 let invalid_action = sqlx::query(
308 r#"
309 SELECT action_type
310 FROM events
311 WHERE action_type NOT IN (
312 'observe', 'create', 'mutate', 'execute',
313 'circuit_breaker',
314 'hold_request', 'hold_response', 'hold_timeout'
315 )
316 LIMIT 1
317 "#,
318 )
319 .fetch_optional(&self.pool)
320 .await?;
321 if let Some(row) = invalid_action {
322 let action_type: String = row.get("action_type");
323 return Err(KernelError::PolicyViolation(format!(
324 "action integrity violation: unsupported action_type={action_type}"
325 )));
326 }
327
328 let invalid_energy_event = sqlx::query(
329 r#"
330 SELECT id, log_index, action_type, reserved_energy, settled_energy
331 FROM events
332 WHERE (action_type IN ('mutate', 'execute') AND (reserved_energy <= 0 OR settled_energy <= 0))
333 OR reserved_energy < 0
334 OR settled_energy < 0
335 LIMIT 1
336 "#,
337 )
338 .fetch_optional(&self.pool)
339 .await?;
340 if let Some(row) = invalid_energy_event {
341 let event_id: String = row.get("id");
342 return Err(KernelError::PolicyViolation(format!(
343 "energy integrity violation in event {event_id}"
344 )));
345 }
346
347 self.validate_audit_coverage().await?;
348
349 info!("startup integrity checks passed");
350 Ok(())
351 }
352
353 async fn validate_audit_coverage(&self) -> KernelResult<()> {
357 let event_count: i64 = sqlx::query("SELECT COUNT(*) AS cnt FROM events")
358 .fetch_one(&self.pool)
359 .await?
360 .get("cnt");
361
362 if event_count == 0 {
363 return Ok(());
364 }
365
366 let checkpoint_tree_size: Option<i64> =
367 sqlx::query("SELECT tree_size FROM audit_checkpoints ORDER BY tree_size DESC LIMIT 1")
368 .fetch_optional(&self.pool)
369 .await?
370 .map(|r| r.get("tree_size"));
371
372 match checkpoint_tree_size {
373 None => {
374 warn!(
375 event_count,
376 "audit coverage gap: {} event(s) committed but no audit checkpoint exists",
377 event_count
378 );
379 }
380 Some(tree_size) if tree_size < event_count => {
381 warn!(
382 event_count,
383 checkpoint_tree_size = tree_size,
384 "audit coverage gap: {} event(s) not yet covered by Merkle checkpoint",
385 event_count - tree_size
386 );
387 }
388 _ => {}
389 }
390
391 Ok(())
392 }
393
394 pub async fn set_policy_version(&self, version: &str) -> KernelResult<()> {
397 self.set_meta("policy_version", version).await
398 }
399
400 async fn set_meta(&self, key: &str, value: &str) -> KernelResult<()> {
401 sqlx::query(
402 r#"
403 INSERT INTO system_meta (key, value)
404 VALUES (?1, ?2)
405 ON CONFLICT(key) DO UPDATE SET value = excluded.value
406 "#,
407 )
408 .bind(key)
409 .bind(value)
410 .execute(&self.pool)
411 .await?;
412 Ok(())
413 }
414}
415
416fn now_millis_string() -> String {
417 let now = std::time::SystemTime::now()
418 .duration_since(std::time::UNIX_EPOCH)
419 .unwrap_or_default();
420 now.as_millis().to_string()
421}