Skip to main content

punkgo_kernel/state/
envelope_store.rs

1//! Envelope persistence layer — CRUD operations on the `envelopes` table.
2//!
3//! Covers: PIP-001 §11 (envelope: budget + targets + actions + duration + checkpoint + hold).
4//! Whitepaper §3 invariant 6: authorization changes enter event history.
5
6use sqlx::{Row, Sqlite, SqlitePool, Transaction};
7
8use punkgo_core::consent::{EnvelopeRecord, EnvelopeSpec, EnvelopeStatus, HoldRule};
9use punkgo_core::errors::{KernelError, KernelResult};
10
11/// PIP-001 §11b: Parameters for creating a hold_request record.
12///
13/// Groups the data fields passed to [`EnvelopeStore::create_hold_request_in_tx`]
14/// so that the function signature stays within clippy's argument-count limit.
15pub struct NewHoldRequest<'a> {
16    /// Stable identifier for this hold (UUID string).
17    pub hold_id: &'a str,
18    /// The envelope that was held.
19    pub envelope_id: &'a str,
20    /// The agent whose action triggered the hold.
21    pub agent_id: &'a str,
22    /// The target path of the triggering action.
23    pub trigger_target: &'a str,
24    /// The action type string of the triggering action.
25    pub trigger_action: &'a str,
26    /// Full payload snapshot of the pending action, stored as JSON.
27    pub pending_payload: &'a serde_json::Value,
28}
29
30#[derive(Clone)]
31pub struct EnvelopeStore {
32    pool: SqlitePool,
33}
34
35impl EnvelopeStore {
36    pub fn new(pool: SqlitePool) -> Self {
37        Self { pool }
38    }
39
40    /// Create a new envelope within an existing transaction.
41    pub async fn create_in_tx(
42        &self,
43        tx: &mut Transaction<'_, Sqlite>,
44        envelope_id: &str,
45        spec: &EnvelopeSpec,
46        parent_envelope_id: Option<&str>,
47    ) -> KernelResult<EnvelopeRecord> {
48        let targets_json = serde_json::to_string(&spec.targets).map_err(|e| {
49            KernelError::PolicyViolation(format!("failed to serialize targets: {e}"))
50        })?;
51        let actions_json = serde_json::to_string(&spec.actions).map_err(|e| {
52            KernelError::PolicyViolation(format!("failed to serialize actions: {e}"))
53        })?;
54        let hold_on_json = serde_json::to_string(&spec.hold_on).map_err(|e| {
55            KernelError::PolicyViolation(format!("failed to serialize hold_on: {e}"))
56        })?;
57        let now = now_millis_string();
58
59        // Compute expires_at from duration_secs
60        let expires_at = spec.duration_secs.map(|d| {
61            let now_ms: u64 = now.parse().unwrap_or(0);
62            let expires_ms = now_ms + (d as u64) * 1000;
63            expires_ms.to_string()
64        });
65
66        sqlx::query(
67            r#"
68            INSERT INTO envelopes (
69                envelope_id, actor_id, grantor_id, parent_envelope_id,
70                budget, budget_consumed, targets, actions,
71                duration_secs, report_every,
72                hold_on, hold_timeout_secs,
73                status, last_report_at,
74                created_at, expires_at, updated_at
75            )
76            VALUES (?1, ?2, ?3, ?4, ?5, 0, ?6, ?7, ?8, ?9,
77                    ?10, ?11,
78                    'active', 0, ?12, ?13, ?12)
79            "#,
80        )
81        .bind(envelope_id)
82        .bind(&spec.actor_id)
83        .bind(&spec.grantor_id)
84        .bind(parent_envelope_id)
85        .bind(spec.budget)
86        .bind(&targets_json)
87        .bind(&actions_json)
88        .bind(spec.duration_secs)
89        .bind(spec.report_every)
90        .bind(&hold_on_json)
91        .bind(spec.hold_timeout_secs)
92        .bind(&now)
93        .bind(&expires_at)
94        .execute(&mut **tx)
95        .await?;
96
97        Ok(EnvelopeRecord {
98            envelope_id: envelope_id.to_string(),
99            actor_id: spec.actor_id.clone(),
100            grantor_id: spec.grantor_id.clone(),
101            parent_envelope_id: parent_envelope_id.map(|s| s.to_string()),
102            budget: spec.budget,
103            budget_consumed: 0,
104            targets: spec.targets.clone(),
105            actions: spec.actions.clone(),
106            duration_secs: spec.duration_secs,
107            report_every: spec.report_every,
108            hold_on: spec.hold_on.clone(),
109            hold_timeout_secs: spec.hold_timeout_secs,
110            status: EnvelopeStatus::Active,
111            last_report_at: 0,
112            created_at: now.clone(),
113            expires_at,
114            updated_at: now,
115        })
116    }
117
118    /// Get the active envelope for an actor. Returns None if no active envelope exists.
119    pub async fn get_active_for_actor(
120        &self,
121        actor_id: &str,
122    ) -> KernelResult<Option<EnvelopeRecord>> {
123        let row = sqlx::query(
124            r#"
125            SELECT envelope_id, actor_id, grantor_id, parent_envelope_id,
126                   budget, budget_consumed, targets, actions,
127                   duration_secs, report_every,
128                   hold_on, hold_timeout_secs,
129                   status, last_report_at,
130                   created_at, expires_at, updated_at
131            FROM envelopes
132            WHERE actor_id = ?1 AND status = 'active'
133            ORDER BY created_at DESC
134            LIMIT 1
135            "#,
136        )
137        .bind(actor_id)
138        .fetch_optional(&self.pool)
139        .await?;
140
141        match row {
142            Some(row) => Ok(Some(row_to_envelope_record(&row)?)),
143            None => Ok(None),
144        }
145    }
146
147    /// Get an envelope by ID.
148    pub async fn get(&self, envelope_id: &str) -> KernelResult<Option<EnvelopeRecord>> {
149        let row = sqlx::query(
150            r#"
151            SELECT envelope_id, actor_id, grantor_id, parent_envelope_id,
152                   budget, budget_consumed, targets, actions,
153                   duration_secs, report_every,
154                   hold_on, hold_timeout_secs,
155                   status, last_report_at,
156                   created_at, expires_at, updated_at
157            FROM envelopes
158            WHERE envelope_id = ?1
159            "#,
160        )
161        .bind(envelope_id)
162        .fetch_optional(&self.pool)
163        .await?;
164
165        match row {
166            Some(row) => Ok(Some(row_to_envelope_record(&row)?)),
167            None => Ok(None),
168        }
169    }
170
171    /// PIP-001 §11b: Get a hold_request record by hold_id.
172    pub async fn get_hold_request(&self, hold_id: &str) -> KernelResult<Option<serde_json::Value>> {
173        let row = sqlx::query(
174            r#"
175            SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
176                   pending_payload, status, decision, instruction, triggered_at, resolved_at
177            FROM hold_requests
178            WHERE hold_id = ?1
179            "#,
180        )
181        .bind(hold_id)
182        .fetch_optional(&self.pool)
183        .await?;
184
185        match row {
186            Some(r) => Ok(Some(serde_json::json!({
187                "hold_id": r.get::<String, _>("hold_id"),
188                "envelope_id": r.get::<String, _>("envelope_id"),
189                "agent_id": r.get::<String, _>("agent_id"),
190                "trigger_target": r.get::<String, _>("trigger_target"),
191                "trigger_action": r.get::<String, _>("trigger_action"),
192                "pending_payload": serde_json::from_str::<serde_json::Value>(
193                    &r.get::<String, _>("pending_payload")
194                ).unwrap_or(serde_json::Value::Null),
195                "status": r.get::<String, _>("status"),
196                "decision": r.get::<Option<String>, _>("decision"),
197                "instruction": r.get::<Option<String>, _>("instruction"),
198                "triggered_at": r.get::<String, _>("triggered_at"),
199                "resolved_at": r.get::<Option<String>, _>("resolved_at"),
200            }))),
201            None => Ok(None),
202        }
203    }
204
205    /// PIP-001 §11b: List pending hold_requests, optionally filtered by agent_id.
206    ///
207    /// Returns records ordered by `triggered_at` ascending (oldest first).
208    /// If `agent_id` is `None`, all pending holds are returned regardless of agent.
209    pub async fn list_pending_holds(
210        &self,
211        agent_id: Option<&str>,
212    ) -> KernelResult<Vec<serde_json::Value>> {
213        let rows = if let Some(id) = agent_id {
214            sqlx::query(
215                r#"
216                SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
217                       pending_payload, status, decision, instruction,
218                       triggered_at, resolved_at
219                FROM hold_requests
220                WHERE status = 'pending' AND agent_id = ?1
221                ORDER BY triggered_at ASC
222                "#,
223            )
224            .bind(id)
225            .fetch_all(&self.pool)
226            .await?
227        } else {
228            sqlx::query(
229                r#"
230                SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
231                       pending_payload, status, decision, instruction,
232                       triggered_at, resolved_at
233                FROM hold_requests
234                WHERE status = 'pending'
235                ORDER BY triggered_at ASC
236                "#,
237            )
238            .fetch_all(&self.pool)
239            .await?
240        };
241
242        let mut result = Vec::new();
243        for row in rows {
244            let pending_payload: serde_json::Value =
245                serde_json::from_str(&row.get::<String, _>("pending_payload"))
246                    .unwrap_or(serde_json::Value::Null);
247            result.push(serde_json::json!({
248                "hold_id":        row.get::<String, _>("hold_id"),
249                "envelope_id":    row.get::<String, _>("envelope_id"),
250                "agent_id":       row.get::<String, _>("agent_id"),
251                "trigger_target": row.get::<String, _>("trigger_target"),
252                "trigger_action": row.get::<String, _>("trigger_action"),
253                "pending_payload": pending_payload,
254                "status":         row.get::<String, _>("status"),
255                "decision":       row.get::<Option<String>, _>("decision"),
256                "instruction":    row.get::<Option<String>, _>("instruction"),
257                "triggered_at":   row.get::<String, _>("triggered_at"),
258                "resolved_at":    row.get::<Option<String>, _>("resolved_at"),
259            }));
260        }
261        Ok(result)
262    }
263
264    /// PIP-001 §11e: List pending hold_requests for a specific envelope.
265    pub async fn list_pending_holds_for_envelope(
266        &self,
267        envelope_id: &str,
268    ) -> KernelResult<Vec<serde_json::Value>> {
269        let rows = sqlx::query(
270            r#"
271            SELECT hold_id, envelope_id, agent_id, trigger_target, trigger_action,
272                   pending_payload, status, decision, instruction,
273                   triggered_at, resolved_at
274            FROM hold_requests
275            WHERE status = 'pending' AND envelope_id = ?1
276            ORDER BY triggered_at ASC
277            "#,
278        )
279        .bind(envelope_id)
280        .fetch_all(&self.pool)
281        .await?;
282
283        let mut result = Vec::new();
284        for row in rows {
285            let pending_payload: serde_json::Value =
286                serde_json::from_str(&row.get::<String, _>("pending_payload"))
287                    .unwrap_or(serde_json::Value::Null);
288            result.push(serde_json::json!({
289                "hold_id":        row.get::<String, _>("hold_id"),
290                "envelope_id":    row.get::<String, _>("envelope_id"),
291                "agent_id":       row.get::<String, _>("agent_id"),
292                "trigger_target": row.get::<String, _>("trigger_target"),
293                "trigger_action": row.get::<String, _>("trigger_action"),
294                "pending_payload": pending_payload,
295                "status":         row.get::<String, _>("status"),
296                "decision":       row.get::<Option<String>, _>("decision"),
297                "instruction":    row.get::<Option<String>, _>("instruction"),
298                "triggered_at":   row.get::<String, _>("triggered_at"),
299                "resolved_at":    row.get::<Option<String>, _>("resolved_at"),
300            }));
301        }
302        Ok(result)
303    }
304
305    /// PIP-001 §11b: Create a hold_request record within a transaction.
306    pub async fn create_hold_request_in_tx(
307        &self,
308        tx: &mut Transaction<'_, Sqlite>,
309        req: &NewHoldRequest<'_>,
310    ) -> KernelResult<()> {
311        let payload_json = serde_json::to_string(req.pending_payload)?;
312        let now = now_millis_string();
313        sqlx::query(
314            r#"
315            INSERT INTO hold_requests (
316                hold_id, envelope_id, agent_id, trigger_target, trigger_action,
317                pending_payload, status, triggered_at
318            )
319            VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)
320            "#,
321        )
322        .bind(req.hold_id)
323        .bind(req.envelope_id)
324        .bind(req.agent_id)
325        .bind(req.trigger_target)
326        .bind(req.trigger_action)
327        .bind(&payload_json)
328        .bind(&now)
329        .execute(&mut **tx)
330        .await?;
331        Ok(())
332    }
333
334    /// PIP-001 §11d: Resolve a hold_request (approve or reject) within a transaction.
335    ///
336    /// `decision` is "approve" or "reject" (PIP-001 §11d HoldResponse).
337    /// The DB `status` column uses the past-tense forms "approved"/"rejected".
338    pub async fn resolve_hold_request_in_tx(
339        &self,
340        tx: &mut Transaction<'_, Sqlite>,
341        hold_id: &str,
342        decision: &str,
343        instruction: Option<&str>,
344    ) -> KernelResult<()> {
345        // Map decision → status (DB uses past-tense forms per CHECK constraint).
346        let status = match decision {
347            "approve" => "approved",
348            "reject" => "rejected",
349            "timed_out" => "timed_out",
350            other => {
351                return Err(KernelError::PolicyViolation(format!(
352                    "invalid hold decision: {other}; must be 'approve', 'reject', or 'timed_out'"
353                )));
354            }
355        };
356
357        let now = now_millis_string();
358        let result = sqlx::query(
359            r#"
360            UPDATE hold_requests
361            SET status = ?1, decision = ?2, instruction = ?3, resolved_at = ?4
362            WHERE hold_id = ?5 AND status = 'pending'
363            "#,
364        )
365        .bind(status)
366        .bind(decision)
367        .bind(instruction)
368        .bind(&now)
369        .bind(hold_id)
370        .execute(&mut **tx)
371        .await?;
372
373        if result.rows_affected() == 0 {
374            return Err(KernelError::PolicyViolation(format!(
375                "hold_request {hold_id} not found or already resolved"
376            )));
377        }
378        Ok(())
379    }
380
381    /// Consume budget from an envelope within a transaction.
382    /// Returns the new total consumed amount.
383    pub async fn consume_budget_in_tx(
384        &self,
385        tx: &mut Transaction<'_, Sqlite>,
386        envelope_id: &str,
387        amount: i64,
388    ) -> KernelResult<i64> {
389        let row =
390            sqlx::query("SELECT budget, budget_consumed FROM envelopes WHERE envelope_id = ?1")
391                .bind(envelope_id)
392                .fetch_optional(&mut **tx)
393                .await?;
394
395        let Some(row) = row else {
396            return Err(KernelError::PolicyViolation(format!(
397                "envelope not found: {envelope_id}"
398            )));
399        };
400
401        let budget: i64 = row.get("budget");
402        let consumed: i64 = row.get("budget_consumed");
403        let new_consumed = consumed + amount;
404
405        // Allow consuming up to budget (halt is handled at checkpoint level)
406        if new_consumed > budget {
407            return Err(KernelError::AuthorizationRequired(format!(
408                "envelope {} budget exceeded: budget={}, already_consumed={}, requested={}",
409                envelope_id, budget, consumed, amount
410            )));
411        }
412
413        let now = now_millis_string();
414        sqlx::query(
415            r#"
416            UPDATE envelopes
417            SET budget_consumed = ?1, updated_at = ?2
418            WHERE envelope_id = ?3
419            "#,
420        )
421        .bind(new_consumed)
422        .bind(&now)
423        .bind(envelope_id)
424        .execute(&mut **tx)
425        .await?;
426
427        Ok(new_consumed)
428    }
429
430    /// Set envelope status within a transaction.
431    pub async fn set_status_in_tx(
432        &self,
433        tx: &mut Transaction<'_, Sqlite>,
434        envelope_id: &str,
435        status: &EnvelopeStatus,
436    ) -> KernelResult<()> {
437        let now = now_millis_string();
438        let result = sqlx::query(
439            r#"
440            UPDATE envelopes
441            SET status = ?1, updated_at = ?2
442            WHERE envelope_id = ?3
443            "#,
444        )
445        .bind(status.as_str())
446        .bind(&now)
447        .bind(envelope_id)
448        .execute(&mut **tx)
449        .await?;
450
451        if result.rows_affected() == 0 {
452            return Err(KernelError::PolicyViolation(format!(
453                "envelope not found: {envelope_id}"
454            )));
455        }
456        Ok(())
457    }
458
459    /// Set envelope status without a transaction (convenience method).
460    pub async fn set_status(&self, envelope_id: &str, status: &EnvelopeStatus) -> KernelResult<()> {
461        let mut tx = self.pool.begin().await?;
462        self.set_status_in_tx(&mut tx, envelope_id, status).await?;
463        tx.commit().await?;
464        Ok(())
465    }
466
467    /// Lazy expiry check: if envelope is time-expired, mark as expired and return true.
468    pub async fn check_and_expire(&self, envelope_id: &str) -> KernelResult<bool> {
469        let envelope = self.get(envelope_id).await?;
470        let Some(envelope) = envelope else {
471            return Ok(false);
472        };
473
474        if envelope.status != EnvelopeStatus::Active {
475            return Ok(false);
476        }
477
478        let now_ms = now_millis();
479        if punkgo_core::consent::is_envelope_expired(&envelope, now_ms) {
480            self.set_status(envelope_id, &EnvelopeStatus::Expired)
481                .await?;
482            return Ok(true);
483        }
484
485        Ok(false)
486    }
487
488    /// Update checkpoint tracking fields.
489    pub async fn update_checkpoint_tracking(
490        &self,
491        envelope_id: &str,
492        last_report_at: i64,
493    ) -> KernelResult<()> {
494        let now = now_millis_string();
495        sqlx::query(
496            "UPDATE envelopes SET last_report_at = ?1, updated_at = ?2 WHERE envelope_id = ?3",
497        )
498        .bind(last_report_at)
499        .bind(&now)
500        .bind(envelope_id)
501        .execute(&self.pool)
502        .await?;
503        Ok(())
504    }
505}
506
507// ---------------------------------------------------------------------------
508// Helpers
509// ---------------------------------------------------------------------------
510
511fn row_to_envelope_record(row: &sqlx::sqlite::SqliteRow) -> KernelResult<EnvelopeRecord> {
512    let status_str: String = row.get("status");
513    let targets_json: String = row.get("targets");
514    let actions_json: String = row.get("actions");
515    let hold_on_json: String = row.get("hold_on");
516
517    let status = EnvelopeStatus::parse(&status_str).ok_or_else(|| {
518        KernelError::PolicyViolation(format!("invalid envelope status in DB: {status_str}"))
519    })?;
520    let targets: Vec<String> = serde_json::from_str(&targets_json)
521        .map_err(|e| KernelError::PolicyViolation(format!("invalid targets JSON: {e}")))?;
522    let actions: Vec<String> = serde_json::from_str(&actions_json)
523        .map_err(|e| KernelError::PolicyViolation(format!("invalid actions JSON: {e}")))?;
524    let hold_on: Vec<HoldRule> = serde_json::from_str(&hold_on_json)
525        .map_err(|e| KernelError::PolicyViolation(format!("invalid hold_on JSON: {e}")))?;
526
527    Ok(EnvelopeRecord {
528        envelope_id: row.get("envelope_id"),
529        actor_id: row.get("actor_id"),
530        grantor_id: row.get("grantor_id"),
531        parent_envelope_id: row.get("parent_envelope_id"),
532        budget: row.get("budget"),
533        budget_consumed: row.get("budget_consumed"),
534        targets,
535        actions,
536        duration_secs: row.get("duration_secs"),
537        report_every: row.get("report_every"),
538        hold_on,
539        hold_timeout_secs: row.get("hold_timeout_secs"),
540        status,
541        last_report_at: row.get("last_report_at"),
542        created_at: row.get("created_at"),
543        expires_at: row.get("expires_at"),
544        updated_at: row.get("updated_at"),
545    })
546}
547
548fn now_millis_string() -> String {
549    let now = std::time::SystemTime::now()
550        .duration_since(std::time::UNIX_EPOCH)
551        .unwrap_or_default();
552    now.as_millis().to_string()
553}
554
555fn now_millis() -> u64 {
556    std::time::SystemTime::now()
557        .duration_since(std::time::UNIX_EPOCH)
558        .unwrap_or_default()
559        .as_millis() as u64
560}