1use std::fs;
2use std::path::{Path, PathBuf};
3
4use punkgo_core::errors::{KernelError, KernelResult};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Row, SqlitePool};
7use tracing::{info, warn};
8
9#[derive(Clone)]
10pub struct StatePaths {
11 pub root: PathBuf,
12 pub workspace_root: PathBuf,
13 pub quarantine_root: PathBuf,
14 pub blobs_root: PathBuf,
15 pub db_path: PathBuf,
16}
17
18pub struct StateStore {
19 pool: SqlitePool,
20 paths: StatePaths,
21}
22
23impl StateStore {
24 pub async fn bootstrap(root: impl AsRef<Path>) -> KernelResult<Self> {
25 let root = root.as_ref().to_path_buf();
26 let workspace_root = root.join("workspaces");
27 let quarantine_root = root.join("quarantine");
28 let event_log_root = root.join("event_log");
29 let blobs_root = root.join("blobs");
30
31 fs::create_dir_all(&workspace_root)?;
32 fs::create_dir_all(&quarantine_root)?;
33 fs::create_dir_all(&event_log_root)?;
34 fs::create_dir_all(&blobs_root)?;
35
36 let db_path = root.join("punkgo.db");
41 let connect_options = SqliteConnectOptions::new()
42 .filename(&db_path)
43 .create_if_missing(true)
44 .journal_mode(SqliteJournalMode::Wal)
45 .synchronous(SqliteSynchronous::Full)
46 .busy_timeout(std::time::Duration::from_secs(5));
47
48 let max_conn = std::thread::available_parallelism()
49 .map(|n| n.get() as u32)
50 .unwrap_or(8)
51 .max(8);
52
53 let pool = SqlitePoolOptions::new()
54 .max_connections(max_conn)
55 .connect_with(connect_options)
56 .await?;
57
58 Self::init_schema(&pool).await?;
59 Self::seed_defaults(&pool).await?;
60
61 let store = Self {
62 pool,
63 paths: StatePaths {
64 root,
65 workspace_root,
66 quarantine_root,
67 blobs_root,
68 db_path,
69 },
70 };
71
72 info!(state_root = %store.paths.root.display(), "state store bootstrapped");
73 store.run_startup_integrity_checks().await?;
74 Ok(store)
75 }
76
77 pub fn pool(&self) -> SqlitePool {
78 self.pool.clone()
79 }
80
81 pub fn paths(&self) -> &StatePaths {
82 &self.paths
83 }
84
85 pub async fn actor_exists(&self, actor_id: &str) -> KernelResult<bool> {
86 let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
87 .bind(actor_id)
88 .fetch_optional(&self.pool)
89 .await?;
90 Ok(row.is_some())
91 }
92
93 async fn init_schema(pool: &SqlitePool) -> KernelResult<()> {
94 sqlx::query(
95 r#"
96 CREATE TABLE IF NOT EXISTS events (
97 id TEXT PRIMARY KEY,
98 log_index INTEGER NOT NULL UNIQUE,
99 event_hash TEXT NOT NULL,
100 actor_id TEXT NOT NULL,
101 action_type TEXT NOT NULL,
102 target TEXT NOT NULL,
103 payload TEXT NOT NULL,
104 payload_hash TEXT NOT NULL,
105 artifact_hash TEXT,
106 reserved_energy INTEGER NOT NULL,
107 settled_energy INTEGER NOT NULL,
108 timestamp TEXT NOT NULL
109 )
110 "#,
111 )
112 .execute(pool)
113 .await?;
114
115 sqlx::query(
116 r#"
117 CREATE TABLE IF NOT EXISTS energy_ledger (
118 actor_id TEXT PRIMARY KEY,
119 energy_balance INTEGER NOT NULL,
120 reserved_energy INTEGER NOT NULL DEFAULT 0,
121 updated_at TEXT NOT NULL
122 )
123 "#,
124 )
125 .execute(pool)
126 .await?;
127
128 sqlx::query(
129 r#"
130 CREATE TABLE IF NOT EXISTS system_meta (
131 key TEXT PRIMARY KEY,
132 value TEXT NOT NULL
133 )
134 "#,
135 )
136 .execute(pool)
137 .await?;
138
139 sqlx::query(
140 r#"
141 CREATE TABLE IF NOT EXISTS audit_hashes (
142 hash_index INTEGER NOT NULL PRIMARY KEY,
143 hash BLOB NOT NULL
144 )
145 "#,
146 )
147 .execute(pool)
148 .await?;
149
150 sqlx::query(
151 r#"
152 CREATE TABLE IF NOT EXISTS audit_checkpoints (
153 tree_size INTEGER NOT NULL PRIMARY KEY,
154 root_hash TEXT NOT NULL,
155 checkpoint_text TEXT NOT NULL,
156 created_at TEXT NOT NULL
157 )
158 "#,
159 )
160 .execute(pool)
161 .await?;
162
163 sqlx::query(
165 r#"
166 CREATE TABLE IF NOT EXISTS audit_tsa_tokens (
167 tree_size INTEGER NOT NULL PRIMARY KEY,
168 root_hash TEXT NOT NULL,
169 tsa_response BLOB NOT NULL,
170 tsa_url TEXT NOT NULL,
171 created_at TEXT NOT NULL
172 )
173 "#,
174 )
175 .execute(pool)
176 .await?;
177
178 sqlx::query(
180 r#"
181 CREATE TABLE IF NOT EXISTS actors (
182 actor_id TEXT PRIMARY KEY,
183 actor_type TEXT NOT NULL CHECK(actor_type IN ('human', 'agent')),
184 creator_id TEXT,
185 lineage TEXT NOT NULL DEFAULT '[]',
186 purpose TEXT,
187 status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'frozen')),
188 writable_targets TEXT NOT NULL DEFAULT '[]',
189 energy_share REAL NOT NULL DEFAULT 0.0,
190 reduction_policy TEXT NOT NULL DEFAULT 'none',
191 created_at TEXT NOT NULL,
192 updated_at TEXT NOT NULL
193 )
194 "#,
195 )
196 .execute(pool)
197 .await?;
198
199 sqlx::query(
202 r#"
203 CREATE TABLE IF NOT EXISTS envelopes (
204 envelope_id TEXT PRIMARY KEY,
205 actor_id TEXT NOT NULL,
206 grantor_id TEXT NOT NULL,
207 parent_envelope_id TEXT,
208 budget INTEGER NOT NULL,
209 budget_consumed INTEGER NOT NULL DEFAULT 0,
210 targets TEXT NOT NULL,
211 actions TEXT NOT NULL,
212 duration_secs INTEGER,
213 report_every INTEGER,
214 hold_on TEXT NOT NULL DEFAULT '[]',
215 hold_timeout_secs INTEGER,
216 status TEXT NOT NULL DEFAULT 'active'
217 CHECK(status IN ('active','expired','revoked')),
218 last_report_at INTEGER NOT NULL DEFAULT 0,
219 created_at TEXT NOT NULL,
220 expires_at TEXT,
221 updated_at TEXT NOT NULL
222 )
223 "#,
224 )
225 .execute(pool)
226 .await?;
227
228 sqlx::query(
230 r#"
231 CREATE TABLE IF NOT EXISTS hold_requests (
232 hold_id TEXT PRIMARY KEY,
233 envelope_id TEXT NOT NULL,
234 agent_id TEXT NOT NULL,
235 trigger_target TEXT NOT NULL,
236 trigger_action TEXT NOT NULL,
237 pending_payload TEXT NOT NULL,
238 status TEXT NOT NULL DEFAULT 'pending'
239 CHECK(status IN ('pending','approved','rejected','timed_out')),
240 decision TEXT,
241 instruction TEXT,
242 triggered_at TEXT NOT NULL,
243 resolved_at TEXT,
244 FOREIGN KEY (envelope_id) REFERENCES envelopes(envelope_id)
245 )
246 "#,
247 )
248 .execute(pool)
249 .await?;
250
251 Ok(())
252 }
253
254 async fn seed_defaults(pool: &SqlitePool) -> KernelResult<()> {
255 let now = now_millis_string();
256 sqlx::query(
257 r#"
258 INSERT INTO system_meta (key, value)
259 VALUES ('schema_version', 'v1')
260 ON CONFLICT(key) DO NOTHING
261 "#,
262 )
263 .execute(pool)
264 .await?;
265
266 sqlx::query(
267 r#"
268 INSERT INTO energy_ledger (actor_id, energy_balance, reserved_energy, updated_at)
269 VALUES ('root', 1000000, 0, ?1)
270 ON CONFLICT(actor_id) DO NOTHING
271 "#,
272 )
273 .bind(&now)
274 .execute(pool)
275 .await?;
276
277 sqlx::query(
282 r#"
283 INSERT INTO actors (
284 actor_id, actor_type, creator_id, lineage, purpose, status,
285 writable_targets, energy_share, reduction_policy, created_at, updated_at
286 )
287 VALUES ('root', 'human', NULL, '[]', NULL, 'active',
288 '[{"target":"**","actions":["create","mutate","execute"]}]',
289 0.0, 'none', ?1, ?1)
290 ON CONFLICT(actor_id) DO NOTHING
291 "#,
292 )
293 .bind(&now)
294 .execute(pool)
295 .await?;
296
297 Ok(())
298 }
299
300 pub async fn run_startup_integrity_checks(&self) -> KernelResult<()> {
301 info!("running startup integrity checks");
302 let violation = sqlx::query(
303 r#"
304 SELECT actor_id, energy_balance, reserved_energy
305 FROM energy_ledger
306 WHERE reserved_energy < 0
307 OR energy_balance < 0
308 OR reserved_energy > energy_balance
309 LIMIT 1
310 "#,
311 )
312 .fetch_optional(&self.pool)
313 .await?;
314
315 if let Some(row) = violation {
316 let actor_id: String = row.get("actor_id");
317 return Err(KernelError::PolicyViolation(format!(
318 "energy ledger consistency violation for actor {actor_id}"
319 )));
320 }
321
322 let invalid_action = sqlx::query(
323 r#"
324 SELECT action_type
325 FROM events
326 WHERE action_type NOT IN (
327 'observe', 'create', 'mutate', 'execute',
328 'circuit_breaker',
329 'hold_request', 'hold_response', 'hold_timeout'
330 )
331 LIMIT 1
332 "#,
333 )
334 .fetch_optional(&self.pool)
335 .await?;
336 if let Some(row) = invalid_action {
337 let action_type: String = row.get("action_type");
338 return Err(KernelError::PolicyViolation(format!(
339 "action integrity violation: unsupported action_type={action_type}"
340 )));
341 }
342
343 let invalid_energy_event = sqlx::query(
344 r#"
345 SELECT id, log_index, action_type, reserved_energy, settled_energy
346 FROM events
347 WHERE (action_type IN ('mutate', 'execute') AND (reserved_energy <= 0 OR settled_energy <= 0))
348 OR reserved_energy < 0
349 OR settled_energy < 0
350 LIMIT 1
351 "#,
352 )
353 .fetch_optional(&self.pool)
354 .await?;
355 if let Some(row) = invalid_energy_event {
356 let event_id: String = row.get("id");
357 return Err(KernelError::PolicyViolation(format!(
358 "energy integrity violation in event {event_id}"
359 )));
360 }
361
362 self.validate_audit_coverage().await?;
363
364 info!("startup integrity checks passed");
365 Ok(())
366 }
367
368 async fn validate_audit_coverage(&self) -> KernelResult<()> {
372 let event_count: i64 = sqlx::query("SELECT COUNT(*) AS cnt FROM events")
373 .fetch_one(&self.pool)
374 .await?
375 .get("cnt");
376
377 if event_count == 0 {
378 return Ok(());
379 }
380
381 let checkpoint_tree_size: Option<i64> =
382 sqlx::query("SELECT tree_size FROM audit_checkpoints ORDER BY tree_size DESC LIMIT 1")
383 .fetch_optional(&self.pool)
384 .await?
385 .map(|r| r.get("tree_size"));
386
387 match checkpoint_tree_size {
388 None => {
389 warn!(
390 event_count,
391 "audit coverage gap: {} event(s) committed but no audit checkpoint exists",
392 event_count
393 );
394 }
395 Some(tree_size) if tree_size < event_count => {
396 warn!(
397 event_count,
398 checkpoint_tree_size = tree_size,
399 "audit coverage gap: {} event(s) not yet covered by Merkle checkpoint",
400 event_count - tree_size
401 );
402 }
403 _ => {}
404 }
405
406 Ok(())
407 }
408
409 pub async fn set_policy_version(&self, version: &str) -> KernelResult<()> {
412 self.set_meta("policy_version", version).await
413 }
414
415 async fn set_meta(&self, key: &str, value: &str) -> KernelResult<()> {
416 sqlx::query(
417 r#"
418 INSERT INTO system_meta (key, value)
419 VALUES (?1, ?2)
420 ON CONFLICT(key) DO UPDATE SET value = excluded.value
421 "#,
422 )
423 .bind(key)
424 .bind(value)
425 .execute(&self.pool)
426 .await?;
427 Ok(())
428 }
429}
430
431fn now_millis_string() -> String {
432 let now = std::time::SystemTime::now()
433 .duration_since(std::time::UNIX_EPOCH)
434 .unwrap_or_default();
435 now.as_millis().to_string()
436}