1use sqlx::{Row, Sqlite, SqlitePool, Transaction};
7
8use punkgo_core::consent::{EnvelopeRecord, EnvelopeSpec, EnvelopeStatus, HoldRule};
9use punkgo_core::errors::{KernelError, KernelResult};
10
11pub struct NewHoldRequest<'a> {
16 pub hold_id: &'a str,
18 pub envelope_id: &'a str,
20 pub agent_id: &'a str,
22 pub trigger_target: &'a str,
24 pub trigger_action: &'a str,
26 pub pending_payload: &'a serde_json::Value,
28}
29
30#[derive(Clone)]
31pub struct EnvelopeStore {
32 pool: SqlitePool,
33}
34
35impl EnvelopeStore {
36 pub fn new(pool: SqlitePool) -> Self {
37 Self { pool }
38 }
39
40 pub async fn create_in_tx(
42 &self,
43 tx: &mut Transaction<'_, Sqlite>,
44 envelope_id: &str,
45 spec: &EnvelopeSpec,
46 parent_envelope_id: Option<&str>,
47 ) -> KernelResult<EnvelopeRecord> {
48 let targets_json = serde_json::to_string(&spec.targets).map_err(|e| {
49 KernelError::PolicyViolation(format!("failed to serialize targets: {e}"))
50 })?;
51 let actions_json = serde_json::to_string(&spec.actions).map_err(|e| {
52 KernelError::PolicyViolation(format!("failed to serialize actions: {e}"))
53 })?;
54 let hold_on_json = serde_json::to_string(&spec.hold_on).map_err(|e| {
55 KernelError::PolicyViolation(format!("failed to serialize hold_on: {e}"))
56 })?;
57 let now = now_millis_string();
58
59 let expires_at = spec.duration_secs.map(|d| {
61 let now_ms: u64 = now.parse().unwrap_or(0);
62 let expires_ms = now_ms + (d as u64) * 1000;
63 expires_ms.to_string()
64 });
65
66 sqlx::query(
67 r#"
68 INSERT INTO envelopes (
69 envelope_id, actor_id, grantor_id, parent_envelope_id,
70 budget, budget_consumed, targets, actions,
71 duration_secs, report_every,
72 hold_on, hold_timeout_secs,
73 status, last_report_at,
74 created_at, expires_at, updated_at
75 )
76 VALUES (?1, ?2, ?3, ?4, ?5, 0, ?6, ?7, ?8, ?9,
77 ?10, ?11,
78 'active', 0, ?12, ?13, ?12)
79 "#,
80 )
81 .bind(envelope_id)
82 .bind(&spec.actor_id)
83 .bind(&spec.grantor_id)
84 .bind(parent_envelope_id)
85 .bind(spec.budget)
86 .bind(&targets_json)
87 .bind(&actions_json)
88 .bind(spec.duration_secs)
89 .bind(spec.report_every)
90 .bind(&hold_on_json)
91 .bind(spec.hold_timeout_secs)
92 .bind(&now)
93 .bind(&expires_at)
94 .execute(&mut **tx)
95 .await?;
96
97 Ok(EnvelopeRecord {
98 envelope_id: envelope_id.to_string(),
99 actor_id: spec.actor_id.clone(),
100 grantor_id: spec.grantor_id.clone(),
101 parent_envelope_id: parent_envelope_id.map(|s| s.to_string()),
102 budget: spec.budget,
103 budget_consumed: 0,
104 targets: spec.targets.clone(),
105 actions: spec.actions.clone(),
106 duration_secs: spec.duration_secs,
107 report_every: spec.report_every,
108 hold_on: spec.hold_on.clone(),
109 hold_timeout_secs: spec.hold_timeout_secs,
110 status: EnvelopeStatus::Active,
111 last_report_at: 0,
112 created_at: now.clone(),
113 expires_at,
114 updated_at: now,
115 })
116 }
117
118 pub async fn get_active_for_actor(
120 &self,
121 actor_id: &str,
122 ) -> KernelResult<Option<EnvelopeRecord>> {
123 let row = sqlx::query(
124 r#"
125 SELECT envelope_id, actor_id, grantor_id, parent_envelope_id,
126 budget, budget_consumed, targets, actions,
127 duration_secs, report_every,
128 hold_on, hold_timeout_secs,
129 status, last_report_at,
130 created_at, expires_at, updated_at
131 FROM envelopes
132 WHERE actor_id = ?1 AND status = 'active'
133 ORDER BY created_at DESC
134 LIMIT 1
135 "#,
136 )
137 .bind(actor_id)
138 .fetch_optional(&self.pool)
139 .await?;
140
141 match row {
142 Some(row) => Ok(Some(row_to_envelope_record(&row)?)),
143 None => Ok(None),
144 }
145 }
146
147 pub async fn get(&self, envelope_id: &str) -> KernelResult<Option<EnvelopeRecord>> {
149 let row = sqlx::query(
150 r#"
151 SELECT envelope_id, actor_id, grantor_id, parent_envelope_id,
152 budget, budget_consumed, targets, actions,
153 duration_secs, report_every,
154 hold_on, hold_timeout_secs,
155 status, last_report_at,
156 created_at, expires_at, updated_at
157 FROM envelopes
158 WHERE envelope_id = ?1
159 "#,
160 )
161 .bind(envelope_id)
162 .fetch_optional(&self.pool)
163 .await?;
164
165 match row {
166 Some(row) => Ok(Some(row_to_envelope_record(&row)?)),
167 None => Ok(None),
168 }
169 }
170
171 pub async fn get_hold_request(&self, hold_id: &str) -> KernelResult<Option<serde_json::Value>> {
173 let row = sqlx::query(
174 r#"
175 SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
176 pending_payload, status, decision, instruction, triggered_at, resolved_at
177 FROM hold_requests
178 WHERE hold_id = ?1
179 "#,
180 )
181 .bind(hold_id)
182 .fetch_optional(&self.pool)
183 .await?;
184
185 match row {
186 Some(r) => Ok(Some(serde_json::json!({
187 "hold_id": r.get::<String, _>("hold_id"),
188 "envelope_id": r.get::<String, _>("envelope_id"),
189 "agent_id": r.get::<String, _>("agent_id"),
190 "trigger_target": r.get::<String, _>("trigger_target"),
191 "trigger_action": r.get::<String, _>("trigger_action"),
192 "pending_payload": serde_json::from_str::<serde_json::Value>(
193 &r.get::<String, _>("pending_payload")
194 ).unwrap_or(serde_json::Value::Null),
195 "status": r.get::<String, _>("status"),
196 "decision": r.get::<Option<String>, _>("decision"),
197 "instruction": r.get::<Option<String>, _>("instruction"),
198 "triggered_at": r.get::<String, _>("triggered_at"),
199 "resolved_at": r.get::<Option<String>, _>("resolved_at"),
200 }))),
201 None => Ok(None),
202 }
203 }
204
205 pub async fn list_pending_holds(
210 &self,
211 agent_id: Option<&str>,
212 ) -> KernelResult<Vec<serde_json::Value>> {
213 let rows = if let Some(id) = agent_id {
214 sqlx::query(
215 r#"
216 SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
217 pending_payload, status, decision, instruction,
218 triggered_at, resolved_at
219 FROM hold_requests
220 WHERE status = 'pending' AND agent_id = ?1
221 ORDER BY triggered_at ASC
222 "#,
223 )
224 .bind(id)
225 .fetch_all(&self.pool)
226 .await?
227 } else {
228 sqlx::query(
229 r#"
230 SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
231 pending_payload, status, decision, instruction,
232 triggered_at, resolved_at
233 FROM hold_requests
234 WHERE status = 'pending'
235 ORDER BY triggered_at ASC
236 "#,
237 )
238 .fetch_all(&self.pool)
239 .await?
240 };
241
242 let mut result = Vec::new();
243 for row in rows {
244 let pending_payload: serde_json::Value =
245 serde_json::from_str(&row.get::<String, _>("pending_payload"))
246 .unwrap_or(serde_json::Value::Null);
247 result.push(serde_json::json!({
248 "hold_id": row.get::<String, _>("hold_id"),
249 "envelope_id": row.get::<String, _>("envelope_id"),
250 "agent_id": row.get::<String, _>("agent_id"),
251 "trigger_target": row.get::<String, _>("trigger_target"),
252 "trigger_action": row.get::<String, _>("trigger_action"),
253 "pending_payload": pending_payload,
254 "status": row.get::<String, _>("status"),
255 "decision": row.get::<Option<String>, _>("decision"),
256 "instruction": row.get::<Option<String>, _>("instruction"),
257 "triggered_at": row.get::<String, _>("triggered_at"),
258 "resolved_at": row.get::<Option<String>, _>("resolved_at"),
259 }));
260 }
261 Ok(result)
262 }
263
264 pub async fn list_pending_holds_for_envelope(
266 &self,
267 envelope_id: &str,
268 ) -> KernelResult<Vec<serde_json::Value>> {
269 let rows = sqlx::query(
270 r#"
271 SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
272 pending_payload, status, decision, instruction,
273 triggered_at, resolved_at
274 FROM hold_requests
275 WHERE status = 'pending' AND envelope_id = ?1
276 ORDER BY triggered_at ASC
277 "#,
278 )
279 .bind(envelope_id)
280 .fetch_all(&self.pool)
281 .await?;
282
283 let mut result = Vec::new();
284 for row in rows {
285 let pending_payload: serde_json::Value =
286 serde_json::from_str(&row.get::<String, _>("pending_payload"))
287 .unwrap_or(serde_json::Value::Null);
288 result.push(serde_json::json!({
289 "hold_id": row.get::<String, _>("hold_id"),
290 "envelope_id": row.get::<String, _>("envelope_id"),
291 "agent_id": row.get::<String, _>("agent_id"),
292 "trigger_target": row.get::<String, _>("trigger_target"),
293 "trigger_action": row.get::<String, _>("trigger_action"),
294 "pending_payload": pending_payload,
295 "status": row.get::<String, _>("status"),
296 "decision": row.get::<Option<String>, _>("decision"),
297 "instruction": row.get::<Option<String>, _>("instruction"),
298 "triggered_at": row.get::<String, _>("triggered_at"),
299 "resolved_at": row.get::<Option<String>, _>("resolved_at"),
300 }));
301 }
302 Ok(result)
303 }
304
305 pub async fn create_hold_request_in_tx(
307 &self,
308 tx: &mut Transaction<'_, Sqlite>,
309 req: &NewHoldRequest<'_>,
310 ) -> KernelResult<()> {
311 let payload_json = serde_json::to_string(req.pending_payload)?;
312 let now = now_millis_string();
313 sqlx::query(
314 r#"
315 INSERT INTO hold_requests (
316 hold_id, envelope_id, agent_id, trigger_target, trigger_action,
317 pending_payload, status, triggered_at
318 )
319 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)
320 "#,
321 )
322 .bind(req.hold_id)
323 .bind(req.envelope_id)
324 .bind(req.agent_id)
325 .bind(req.trigger_target)
326 .bind(req.trigger_action)
327 .bind(&payload_json)
328 .bind(&now)
329 .execute(&mut **tx)
330 .await?;
331 Ok(())
332 }
333
334 pub async fn resolve_hold_request_in_tx(
339 &self,
340 tx: &mut Transaction<'_, Sqlite>,
341 hold_id: &str,
342 decision: &str,
343 instruction: Option<&str>,
344 ) -> KernelResult<()> {
345 let status = match decision {
347 "approve" => "approved",
348 "reject" => "rejected",
349 "timed_out" => "timed_out",
350 other => {
351 return Err(KernelError::PolicyViolation(format!(
352 "invalid hold decision: {other}; must be 'approve', 'reject', or 'timed_out'"
353 )));
354 }
355 };
356
357 let now = now_millis_string();
358 let result = sqlx::query(
359 r#"
360 UPDATE hold_requests
361 SET status = ?1, decision = ?2, instruction = ?3, resolved_at = ?4
362 WHERE hold_id = ?5 AND status = 'pending'
363 "#,
364 )
365 .bind(status)
366 .bind(decision)
367 .bind(instruction)
368 .bind(&now)
369 .bind(hold_id)
370 .execute(&mut **tx)
371 .await?;
372
373 if result.rows_affected() == 0 {
374 return Err(KernelError::PolicyViolation(format!(
375 "hold_request {hold_id} not found or already resolved"
376 )));
377 }
378 Ok(())
379 }
380
381 pub async fn consume_budget_in_tx(
384 &self,
385 tx: &mut Transaction<'_, Sqlite>,
386 envelope_id: &str,
387 amount: i64,
388 ) -> KernelResult<i64> {
389 let row =
390 sqlx::query("SELECT budget, budget_consumed FROM envelopes WHERE envelope_id = ?1")
391 .bind(envelope_id)
392 .fetch_optional(&mut **tx)
393 .await?;
394
395 let Some(row) = row else {
396 return Err(KernelError::PolicyViolation(format!(
397 "envelope not found: {envelope_id}"
398 )));
399 };
400
401 let budget: i64 = row.get("budget");
402 let consumed: i64 = row.get("budget_consumed");
403 let new_consumed = consumed + amount;
404
405 if new_consumed > budget {
407 return Err(KernelError::AuthorizationRequired(format!(
408 "envelope {} budget exceeded: budget={}, already_consumed={}, requested={}",
409 envelope_id, budget, consumed, amount
410 )));
411 }
412
413 let now = now_millis_string();
414 sqlx::query(
415 r#"
416 UPDATE envelopes
417 SET budget_consumed = ?1, updated_at = ?2
418 WHERE envelope_id = ?3
419 "#,
420 )
421 .bind(new_consumed)
422 .bind(&now)
423 .bind(envelope_id)
424 .execute(&mut **tx)
425 .await?;
426
427 Ok(new_consumed)
428 }
429
430 pub async fn set_status_in_tx(
432 &self,
433 tx: &mut Transaction<'_, Sqlite>,
434 envelope_id: &str,
435 status: &EnvelopeStatus,
436 ) -> KernelResult<()> {
437 let now = now_millis_string();
438 let result = sqlx::query(
439 r#"
440 UPDATE envelopes
441 SET status = ?1, updated_at = ?2
442 WHERE envelope_id = ?3
443 "#,
444 )
445 .bind(status.as_str())
446 .bind(&now)
447 .bind(envelope_id)
448 .execute(&mut **tx)
449 .await?;
450
451 if result.rows_affected() == 0 {
452 return Err(KernelError::PolicyViolation(format!(
453 "envelope not found: {envelope_id}"
454 )));
455 }
456 Ok(())
457 }
458
459 pub async fn set_status(&self, envelope_id: &str, status: &EnvelopeStatus) -> KernelResult<()> {
461 let mut tx = self.pool.begin().await?;
462 self.set_status_in_tx(&mut tx, envelope_id, status).await?;
463 tx.commit().await?;
464 Ok(())
465 }
466
467 pub async fn check_and_expire(&self, envelope_id: &str) -> KernelResult<bool> {
469 let envelope = self.get(envelope_id).await?;
470 let Some(envelope) = envelope else {
471 return Ok(false);
472 };
473
474 if envelope.status != EnvelopeStatus::Active {
475 return Ok(false);
476 }
477
478 let now_ms = now_millis();
479 if punkgo_core::consent::is_envelope_expired(&envelope, now_ms) {
480 self.set_status(envelope_id, &EnvelopeStatus::Expired)
481 .await?;
482 return Ok(true);
483 }
484
485 Ok(false)
486 }
487
488 pub async fn update_checkpoint_tracking(
490 &self,
491 envelope_id: &str,
492 last_report_at: i64,
493 ) -> KernelResult<()> {
494 let now = now_millis_string();
495 sqlx::query(
496 "UPDATE envelopes SET last_report_at = ?1, updated_at = ?2 WHERE envelope_id = ?3",
497 )
498 .bind(last_report_at)
499 .bind(&now)
500 .bind(envelope_id)
501 .execute(&self.pool)
502 .await?;
503 Ok(())
504 }
505}
506
507fn row_to_envelope_record(row: &sqlx::sqlite::SqliteRow) -> KernelResult<EnvelopeRecord> {
512 let status_str: String = row.get("status");
513 let targets_json: String = row.get("targets");
514 let actions_json: String = row.get("actions");
515 let hold_on_json: String = row.get("hold_on");
516
517 let status = EnvelopeStatus::parse(&status_str).ok_or_else(|| {
518 KernelError::PolicyViolation(format!("invalid envelope status in DB: {status_str}"))
519 })?;
520 let targets: Vec<String> = serde_json::from_str(&targets_json)
521 .map_err(|e| KernelError::PolicyViolation(format!("invalid targets JSON: {e}")))?;
522 let actions: Vec<String> = serde_json::from_str(&actions_json)
523 .map_err(|e| KernelError::PolicyViolation(format!("invalid actions JSON: {e}")))?;
524 let hold_on: Vec<HoldRule> = serde_json::from_str(&hold_on_json)
525 .map_err(|e| KernelError::PolicyViolation(format!("invalid hold_on JSON: {e}")))?;
526
527 Ok(EnvelopeRecord {
528 envelope_id: row.get("envelope_id"),
529 actor_id: row.get("actor_id"),
530 grantor_id: row.get("grantor_id"),
531 parent_envelope_id: row.get("parent_envelope_id"),
532 budget: row.get("budget"),
533 budget_consumed: row.get("budget_consumed"),
534 targets,
535 actions,
536 duration_secs: row.get("duration_secs"),
537 report_every: row.get("report_every"),
538 hold_on,
539 hold_timeout_secs: row.get("hold_timeout_secs"),
540 status,
541 last_report_at: row.get("last_report_at"),
542 created_at: row.get("created_at"),
543 expires_at: row.get("expires_at"),
544 updated_at: row.get("updated_at"),
545 })
546}
547
548fn now_millis_string() -> String {
549 let now = std::time::SystemTime::now()
550 .duration_since(std::time::UNIX_EPOCH)
551 .unwrap_or_default();
552 now.as_millis().to_string()
553}
554
555fn now_millis() -> u64 {
556 std::time::SystemTime::now()
557 .duration_since(std::time::UNIX_EPOCH)
558 .unwrap_or_default()
559 .as_millis() as u64
560}