1use std::fs;
2use std::path::{Path, PathBuf};
3
4use punkgo_core::errors::{KernelError, KernelResult};
5use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
6use sqlx::{Row, SqlitePool};
7use tracing::{info, warn};
8
9#[derive(Clone)]
10pub struct StatePaths {
11 pub root: PathBuf,
12 pub workspace_root: PathBuf,
13 pub quarantine_root: PathBuf,
14 pub blobs_root: PathBuf,
15 pub db_path: PathBuf,
16}
17
18pub struct StateStore {
19 pool: SqlitePool,
20 paths: StatePaths,
21}
22
23impl StateStore {
24 pub async fn bootstrap(root: impl AsRef<Path>) -> KernelResult<Self> {
25 let root = root.as_ref().to_path_buf();
26 let workspace_root = root.join("workspaces");
27 let quarantine_root = root.join("quarantine");
28 let event_log_root = root.join("event_log");
29 let blobs_root = root.join("blobs");
30
31 fs::create_dir_all(&workspace_root)?;
32 fs::create_dir_all(&quarantine_root)?;
33 fs::create_dir_all(&event_log_root)?;
34 fs::create_dir_all(&blobs_root)?;
35
36 let db_path = root.join("punkgo.db");
41 let connect_options = SqliteConnectOptions::new()
42 .filename(&db_path)
43 .create_if_missing(true)
44 .journal_mode(SqliteJournalMode::Wal)
45 .synchronous(SqliteSynchronous::Full)
46 .busy_timeout(std::time::Duration::from_secs(5));
47
48 let max_conn = std::thread::available_parallelism()
49 .map(|n| n.get() as u32)
50 .unwrap_or(8)
51 .max(8);
52
53 let pool = SqlitePoolOptions::new()
54 .max_connections(max_conn)
55 .connect_with(connect_options)
56 .await?;
57
58 Self::init_schema(&pool).await?;
59 Self::seed_defaults(&pool).await?;
60
61 let store = Self {
62 pool,
63 paths: StatePaths {
64 root,
65 workspace_root,
66 quarantine_root,
67 blobs_root,
68 db_path,
69 },
70 };
71
72 info!(state_root = %store.paths.root.display(), "state store bootstrapped");
73 store.run_startup_integrity_checks().await?;
74 Ok(store)
75 }
76
77 pub fn pool(&self) -> SqlitePool {
78 self.pool.clone()
79 }
80
81 pub fn paths(&self) -> &StatePaths {
82 &self.paths
83 }
84
85 pub async fn actor_exists(&self, actor_id: &str) -> KernelResult<bool> {
86 let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
87 .bind(actor_id)
88 .fetch_optional(&self.pool)
89 .await?;
90 Ok(row.is_some())
91 }
92
93 async fn init_schema(pool: &SqlitePool) -> KernelResult<()> {
94 sqlx::query(
95 r#"
96 CREATE TABLE IF NOT EXISTS events (
97 id TEXT PRIMARY KEY,
98 log_index INTEGER NOT NULL UNIQUE,
99 event_hash TEXT NOT NULL,
100 actor_id TEXT NOT NULL,
101 action_type TEXT NOT NULL,
102 target TEXT NOT NULL,
103 payload TEXT NOT NULL,
104 payload_hash TEXT NOT NULL,
105 artifact_hash TEXT,
106 reserved_energy INTEGER NOT NULL,
107 settled_energy INTEGER NOT NULL,
108 timestamp TEXT NOT NULL
109 )
110 "#,
111 )
112 .execute(pool)
113 .await?;
114
115 sqlx::query(
116 r#"
117 CREATE TABLE IF NOT EXISTS energy_ledger (
118 actor_id TEXT PRIMARY KEY,
119 energy_balance INTEGER NOT NULL,
120 reserved_energy INTEGER NOT NULL DEFAULT 0,
121 updated_at TEXT NOT NULL
122 )
123 "#,
124 )
125 .execute(pool)
126 .await?;
127
128 sqlx::query(
129 r#"
130 CREATE TABLE IF NOT EXISTS system_meta (
131 key TEXT PRIMARY KEY,
132 value TEXT NOT NULL
133 )
134 "#,
135 )
136 .execute(pool)
137 .await?;
138
139 sqlx::query(
140 r#"
141 CREATE TABLE IF NOT EXISTS audit_hashes (
142 hash_index INTEGER NOT NULL PRIMARY KEY,
143 hash BLOB NOT NULL
144 )
145 "#,
146 )
147 .execute(pool)
148 .await?;
149
150 sqlx::query(
151 r#"
152 CREATE TABLE IF NOT EXISTS audit_checkpoints (
153 tree_size INTEGER NOT NULL PRIMARY KEY,
154 root_hash TEXT NOT NULL,
155 checkpoint_text TEXT NOT NULL,
156 created_at TEXT NOT NULL
157 )
158 "#,
159 )
160 .execute(pool)
161 .await?;
162
163 sqlx::query(
165 r#"
166 CREATE TABLE IF NOT EXISTS actors (
167 actor_id TEXT PRIMARY KEY,
168 actor_type TEXT NOT NULL CHECK(actor_type IN ('human', 'agent')),
169 creator_id TEXT,
170 lineage TEXT NOT NULL DEFAULT '[]',
171 purpose TEXT,
172 status TEXT NOT NULL DEFAULT 'active' CHECK(status IN ('active', 'frozen')),
173 writable_targets TEXT NOT NULL DEFAULT '[]',
174 energy_share REAL NOT NULL DEFAULT 0.0,
175 reduction_policy TEXT NOT NULL DEFAULT 'none',
176 created_at TEXT NOT NULL,
177 updated_at TEXT NOT NULL
178 )
179 "#,
180 )
181 .execute(pool)
182 .await?;
183
184 sqlx::query(
187 r#"
188 CREATE TABLE IF NOT EXISTS envelopes (
189 envelope_id TEXT PRIMARY KEY,
190 actor_id TEXT NOT NULL,
191 grantor_id TEXT NOT NULL,
192 parent_envelope_id TEXT,
193 budget INTEGER NOT NULL,
194 budget_consumed INTEGER NOT NULL DEFAULT 0,
195 targets TEXT NOT NULL,
196 actions TEXT NOT NULL,
197 duration_secs INTEGER,
198 report_every INTEGER,
199 hold_on TEXT NOT NULL DEFAULT '[]',
200 hold_timeout_secs INTEGER,
201 status TEXT NOT NULL DEFAULT 'active'
202 CHECK(status IN ('active','expired','revoked')),
203 last_report_at INTEGER NOT NULL DEFAULT 0,
204 created_at TEXT NOT NULL,
205 expires_at TEXT,
206 updated_at TEXT NOT NULL
207 )
208 "#,
209 )
210 .execute(pool)
211 .await?;
212
213 sqlx::query(
215 r#"
216 CREATE TABLE IF NOT EXISTS hold_requests (
217 hold_id TEXT PRIMARY KEY,
218 envelope_id TEXT NOT NULL,
219 agent_id TEXT NOT NULL,
220 trigger_target TEXT NOT NULL,
221 trigger_action TEXT NOT NULL,
222 pending_payload TEXT NOT NULL,
223 status TEXT NOT NULL DEFAULT 'pending'
224 CHECK(status IN ('pending','approved','rejected','timed_out')),
225 decision TEXT,
226 instruction TEXT,
227 triggered_at TEXT NOT NULL,
228 resolved_at TEXT,
229 FOREIGN KEY (envelope_id) REFERENCES envelopes(envelope_id)
230 )
231 "#,
232 )
233 .execute(pool)
234 .await?;
235
236 Ok(())
237 }
238
239 async fn seed_defaults(pool: &SqlitePool) -> KernelResult<()> {
240 let now = now_millis_string();
241 sqlx::query(
242 r#"
243 INSERT INTO system_meta (key, value)
244 VALUES ('schema_version', 'v1')
245 ON CONFLICT(key) DO NOTHING
246 "#,
247 )
248 .execute(pool)
249 .await?;
250
251 sqlx::query(
252 r#"
253 INSERT INTO energy_ledger (actor_id, energy_balance, reserved_energy, updated_at)
254 VALUES ('root', 1000000, 0, ?1)
255 ON CONFLICT(actor_id) DO NOTHING
256 "#,
257 )
258 .bind(&now)
259 .execute(pool)
260 .await?;
261
262 sqlx::query(
264 r#"
265 INSERT INTO actors (
266 actor_id, actor_type, creator_id, lineage, purpose, status,
267 writable_targets, energy_share, reduction_policy, created_at, updated_at
268 )
269 VALUES ('root', 'human', NULL, '[]', NULL, 'active',
270 '[{"target":"**","actions":["create","mutate","execute"]}]',
271 100.0, 'none', ?1, ?1)
272 ON CONFLICT(actor_id) DO NOTHING
273 "#,
274 )
275 .bind(&now)
276 .execute(pool)
277 .await?;
278
279 Ok(())
280 }
281
282 pub async fn run_startup_integrity_checks(&self) -> KernelResult<()> {
283 info!("running startup integrity checks");
284 let violation = sqlx::query(
285 r#"
286 SELECT actor_id, energy_balance, reserved_energy
287 FROM energy_ledger
288 WHERE reserved_energy < 0
289 OR energy_balance < 0
290 OR reserved_energy > energy_balance
291 LIMIT 1
292 "#,
293 )
294 .fetch_optional(&self.pool)
295 .await?;
296
297 if let Some(row) = violation {
298 let actor_id: String = row.get("actor_id");
299 return Err(KernelError::PolicyViolation(format!(
300 "energy ledger consistency violation for actor {actor_id}"
301 )));
302 }
303
304 let invalid_action = sqlx::query(
305 r#"
306 SELECT action_type
307 FROM events
308 WHERE action_type NOT IN (
309 'observe', 'create', 'mutate', 'execute',
310 'circuit_breaker',
311 'hold_request', 'hold_response', 'hold_timeout'
312 )
313 LIMIT 1
314 "#,
315 )
316 .fetch_optional(&self.pool)
317 .await?;
318 if let Some(row) = invalid_action {
319 let action_type: String = row.get("action_type");
320 return Err(KernelError::PolicyViolation(format!(
321 "action integrity violation: unsupported action_type={action_type}"
322 )));
323 }
324
325 let invalid_energy_event = sqlx::query(
326 r#"
327 SELECT id, log_index, action_type, reserved_energy, settled_energy
328 FROM events
329 WHERE (action_type IN ('mutate', 'execute') AND (reserved_energy <= 0 OR settled_energy <= 0))
330 OR reserved_energy < 0
331 OR settled_energy < 0
332 LIMIT 1
333 "#,
334 )
335 .fetch_optional(&self.pool)
336 .await?;
337 if let Some(row) = invalid_energy_event {
338 let event_id: String = row.get("id");
339 return Err(KernelError::PolicyViolation(format!(
340 "energy integrity violation in event {event_id}"
341 )));
342 }
343
344 self.validate_audit_coverage().await?;
345
346 info!("startup integrity checks passed");
347 Ok(())
348 }
349
350 async fn validate_audit_coverage(&self) -> KernelResult<()> {
354 let event_count: i64 = sqlx::query("SELECT COUNT(*) AS cnt FROM events")
355 .fetch_one(&self.pool)
356 .await?
357 .get("cnt");
358
359 if event_count == 0 {
360 return Ok(());
361 }
362
363 let checkpoint_tree_size: Option<i64> =
364 sqlx::query("SELECT tree_size FROM audit_checkpoints ORDER BY tree_size DESC LIMIT 1")
365 .fetch_optional(&self.pool)
366 .await?
367 .map(|r| r.get("tree_size"));
368
369 match checkpoint_tree_size {
370 None => {
371 warn!(
372 event_count,
373 "audit coverage gap: {} event(s) committed but no audit checkpoint exists",
374 event_count
375 );
376 }
377 Some(tree_size) if tree_size < event_count => {
378 warn!(
379 event_count,
380 checkpoint_tree_size = tree_size,
381 "audit coverage gap: {} event(s) not yet covered by Merkle checkpoint",
382 event_count - tree_size
383 );
384 }
385 _ => {}
386 }
387
388 Ok(())
389 }
390
391 pub async fn set_policy_version(&self, version: &str) -> KernelResult<()> {
394 self.set_meta("policy_version", version).await
395 }
396
397 async fn set_meta(&self, key: &str, value: &str) -> KernelResult<()> {
398 sqlx::query(
399 r#"
400 INSERT INTO system_meta (key, value)
401 VALUES (?1, ?2)
402 ON CONFLICT(key) DO UPDATE SET value = excluded.value
403 "#,
404 )
405 .bind(key)
406 .bind(value)
407 .execute(&self.pool)
408 .await?;
409 Ok(())
410 }
411}
412
413fn now_millis_string() -> String {
414 let now = std::time::SystemTime::now()
415 .duration_since(std::time::UNIX_EPOCH)
416 .unwrap_or_default();
417 now.as_millis().to_string()
418}