Skip to main content

punkgo_kernel/runtime/
kernel.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11    ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20    ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21    NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26/// Configuration for bootstrapping the kernel.
27#[derive(Debug, Clone)]
28pub struct KernelConfig {
29    pub state_dir: PathBuf,
30    pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34    fn default() -> Self {
35        let state_dir = std::env::var("PUNKGO_STATE_DIR")
36            .map(PathBuf::from)
37            .unwrap_or_else(|_| default_state_dir());
38        Self {
39            state_dir,
40            ipc_endpoint: "punkgo-kernel".to_string(),
41        }
42    }
43}
44
45/// Default state directory: ~/.punkgo/state
46///
47/// Falls back to `./state` if home directory cannot be determined.
48fn default_state_dir() -> PathBuf {
49    if let Some(home) = home_dir() {
50        return home.join(".punkgo").join("state");
51    }
52    PathBuf::from("state")
53}
54
55fn home_dir() -> Option<PathBuf> {
56    // Unix / WSL
57    if let Some(home) = std::env::var_os("HOME") {
58        return Some(PathBuf::from(home));
59    }
60    // Windows primary
61    if let Some(profile) = std::env::var_os("USERPROFILE") {
62        return Some(PathBuf::from(profile));
63    }
64    // Windows fallback
65    let drive = std::env::var_os("HOMEDRIVE")?;
66    let path = std::env::var_os("HOMEPATH")?;
67    let mut p = PathBuf::from(drive);
68    p.push(path);
69    Some(p)
70}
71
72/// Cryptographic receipt returned after a successful action submission.
73///
74/// Contains the event ID, log index, event hash, and energy costs.
75/// This is the caller's proof that their action was committed to the
76/// append-only history.
77#[derive(Debug, Clone, Serialize)]
78pub struct SubmitReceipt {
79    pub event_id: String,
80    pub log_index: i64,
81    pub event_hash: String,
82    pub reserved_cost: i64,
83    pub settled_cost: i64,
84    pub artifact_hash: Option<String>,
85}
86
87#[derive(Debug, Deserialize)]
88struct ReadQuery {
89    kind: String,
90    actor_id: Option<String>,
91    /// PIP-001 §11: hold_id for hold_info queries.
92    #[serde(default)]
93    hold_id: Option<String>,
94    limit: Option<i64>,
95    /// For audit_inclusion_proof: the event's log_index to prove.
96    log_index: Option<i64>,
97    /// For audit_inclusion_proof / audit_consistency_proof: the tree size to prove against.
98    tree_size: Option<i64>,
99    /// For audit_consistency_proof: the older tree size.
100    old_size: Option<i64>,
101    /// Pagination: only return events with log_index < this value.
102    /// Used for backward pagination (fetching older events).
103    #[serde(default)]
104    before_index: Option<i64>,
105    /// Pagination: only return events with log_index > this value.
106    /// Used for forward pagination (fetching newer events).
107    #[serde(default)]
108    after_index: Option<i64>,
109    /// Identity of the requester. Used by visibility boundary checks.
110    /// Currently optional — when absent, all reads are permitted (single-user).
111    /// When multi-user collaboration is introduced, this becomes required.
112    #[serde(default)]
113    requester_id: Option<String>,
114}
115
116/// The PunkGo kernel — single-writer append-only event system.
117///
118/// The kernel is the **committer** (whitepaper §2): a structural role that
119/// provides a single linearization point for actions. It is not a judge.
120///
121/// Use [`Kernel::bootstrap`] to initialize from a state directory, then
122/// [`Kernel::handle_request`] to process IPC requests.
123pub struct Kernel {
124    state_store: StateStore,
125    energy_ledger: EnergyLedger,
126    event_log: EventLog,
127    audit_log: AuditLog,
128    actor_store: ActorStore,
129    envelope_store: EnvelopeStore,
130    stellar_config: StellarConfig,
131}
132
133impl Kernel {
134    /// Initialize the kernel from a state directory.
135    ///
136    /// Creates the SQLite database, loads stellar config, and prepares the root actor.
137    pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
138        let state_store = StateStore::bootstrap(&config.state_dir).await?;
139        let energy_ledger = EnergyLedger::new(state_store.pool());
140        let event_log = EventLog::new(state_store.pool());
141        let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
142        let actor_store = ActorStore::new(state_store.pool());
143        let envelope_store = EnvelopeStore::new(state_store.pool());
144
145        // Phase 2: Load stellar configuration (PIP-001 §1).
146        let stellar_config_path = config.state_dir.join("stellar.toml");
147        let stellar_config = load_stellar_config(&stellar_config_path)?;
148        info!(
149            energy_per_tick = stellar_config.effective_energy_per_tick(),
150            int8_tops = stellar_config.int8_tops,
151            tick_interval_ms = stellar_config.tick_interval_ms,
152            "stellar configuration loaded"
153        );
154
155        Ok(Self {
156            state_store,
157            energy_ledger,
158            event_log,
159            audit_log,
160            actor_store,
161            envelope_store,
162            stellar_config,
163        })
164    }
165
166    /// Returns a reference to the stellar configuration for external use
167    /// (e.g., energy producer startup).
168    pub fn stellar_config(&self) -> &StellarConfig {
169        &self.stellar_config
170    }
171
172    /// Returns a clone of the energy ledger for external use (e.g., energy producer).
173    pub fn energy_ledger(&self) -> &EnergyLedger {
174        &self.energy_ledger
175    }
176
177    /// Returns a clone of the actor store for external use (e.g., energy producer).
178    pub fn actor_store(&self) -> &ActorStore {
179        &self.actor_store
180    }
181
182    /// Returns a reference to the envelope store for external use.
183    pub fn envelope_store(&self) -> &EnvelopeStore {
184        &self.envelope_store
185    }
186
187    /// Returns the SQLite pool for external use (e.g., energy producer transaction).
188    pub fn pool(&self) -> sqlx::SqlitePool {
189        self.state_store.pool()
190    }
191
192    /// Handle an incoming IPC request — dispatches to quote, submit, or read.
193    pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
194        let request_id = req.request_id.clone();
195        info!(
196            request_id = %request_id,
197            request_type = ?req.request_type,
198            "received request"
199        );
200        match self.dispatch(req).await {
201            Ok(payload) => {
202                info!(request_id = %request_id, "request completed");
203                ResponseEnvelope::ok(request_id, payload)
204            }
205            Err(err) => {
206                warn!(error = %err, "request failed");
207                ResponseEnvelope::err_structured(request_id, &err)
208            }
209        }
210    }
211
212    async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
213        match req.request_type {
214            RequestType::Quote => {
215                let action: Action = serde_json::from_value(req.payload)?;
216                validate_action(&action)?;
217                let cost = quote_cost(&action);
218                Ok(json!({ "cost": cost }))
219            }
220            RequestType::Submit => {
221                let action: Action = serde_json::from_value(req.payload)?;
222                let receipt = self.submit_action(action).await?;
223                Ok(serde_json::to_value(receipt)?)
224            }
225            RequestType::Read => {
226                let query: ReadQuery = serde_json::from_value(req.payload)?;
227                self.read_query(query).await
228            }
229        }
230    }
231
232    async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
233        // Step 1: VALIDATE — policy + actor existence + frozen check
234        validate_action(&action)?;
235        if !self.state_store.actor_exists(&action.actor_id).await? {
236            return Err(KernelError::ActorNotFound(action.actor_id.clone()));
237        }
238
239        // Phase 1: check frozen status via actors table.
240        // Phase 3: check writable boundary (PIP-001 §8/§9).
241        // Phase 4a: check lineage activity (PIP-001 §7) + lifecycle ops.
242        if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
243            if actor.status == punkgo_core::actor::ActorStatus::Frozen
244                && action.action_type.is_state_changing()
245            {
246                return Err(KernelError::ActorFrozen(format!(
247                    "actor {} is frozen and cannot perform state-changing actions",
248                    action.actor_id
249                )));
250            }
251            // Phase 3: Boundary enforcement — check writable_targets.
252            check_writable_boundary(&actor, &action.target, &action.action_type)?;
253
254            // Phase 4a: Check lineage activity for agents (PIP-001 §7).
255            // Agents need their entire creation chain to be active.
256            if actor.actor_type == punkgo_core::actor::ActorType::Agent
257                && action.action_type.is_state_changing()
258            {
259                lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
260            }
261
262            // Phase 4b: Authorization check (PIP-001 §5/§6/§11).
263            // Agents performing state-changing actions need an active envelope.
264            if action.action_type.is_state_changing() {
265                let envelope = self
266                    .envelope_store
267                    .get_active_for_actor(&action.actor_id)
268                    .await?;
269
270                // Lazy expiry check (PIP-001 §11 checkpoint): if envelope exists but is time-expired, mark it.
271                let envelope = if let Some(env) = envelope {
272                    if consent::is_envelope_expired(&env, now_millis_u64()) {
273                        self.envelope_store
274                            .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
275                            .await?;
276                        None
277                    } else {
278                        Some(env)
279                    }
280                } else {
281                    None
282                };
283
284                let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
285
286                // For ManOnTheLoop, also verify envelope covers this specific action.
287                if auth_mode == AuthorizationMode::ManOnTheLoop
288                    && let Some(ref env) = envelope
289                {
290                    consent::check_envelope_covers(
291                        env,
292                        &action.target,
293                        action.action_type.as_str(),
294                    )?;
295
296                    // PIP-001 §11e: Lazy-expire timed-out holds before checking new triggers.
297                    if !env.hold_on.is_empty() {
298                        if let Some(timeout_secs) = env.hold_timeout_secs {
299                            if timeout_secs > 0 {
300                                self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
301                                    .await?;
302                            }
303                        }
304                    }
305
306                    // PIP-001 §11b: Check hold_on rules — if triggered, create a hold_request
307                    // event and reserve energy before execution.
308                    // Skip the hold check if this action was already approved by a Human
309                    // (re-execution after approve; PIP-001 §11d).
310                    let hold_approved = action
311                        .payload
312                        .get("_hold_approved")
313                        .and_then(|v| v.as_bool())
314                        .unwrap_or(false);
315                    if !hold_approved
316                        && consent::check_hold_trigger(
317                            &env.hold_on,
318                            &action.target,
319                            action.action_type.as_str(),
320                        )
321                    {
322                        let hold_id = Uuid::new_v4().to_string();
323
324                        // PIP-001 §11c: Commit = reserve energy (gasLimit model).
325                        // Reserves energy upfront so agent can't overspend while hold is pending.
326                        let reserved_cost = quote_cost(&action) as i64;
327
328                        let hold_payload = json!({
329                            "hold_id": &hold_id,
330                            "agent_id": &action.actor_id,
331                            "trigger": {
332                                "target": &action.target,
333                                "action_type": action.action_type.as_str()
334                            },
335                            "pending_action": {
336                                "target": &action.target,
337                                "action_type": action.action_type.as_str(),
338                                "payload": &action.payload
339                            },
340                            "reserved_cost": reserved_cost,
341                            "triggered_at": now_millis_string()
342                        });
343
344                        // Write hold_request event into history (PIP-001 §11b, whitepaper §3 inv 6).
345                        let hold_action = Action {
346                            actor_id: action.actor_id.clone(),
347                            action_type: ActionType::Mutate,
348                            target: format!("ledger/hold/{hold_id}"),
349                            payload: hold_payload.clone(),
350                            timestamp: None,
351                        };
352                        let mut hold_event = EventRecord {
353                            id: Uuid::new_v4().to_string(),
354                            log_index: 0,
355                            event_hash: String::new(),
356                            actor_id: action.actor_id.clone(),
357                            action_type: "hold_request".to_string(),
358                            target: format!("ledger/hold/{hold_id}"),
359                            payload: hold_payload.clone(),
360                            payload_hash: payload_hash_hex(&hold_action)?,
361                            artifact_hash: None,
362                            reserved_energy: reserved_cost,
363                            settled_energy: 0,
364                            timestamp: now_millis_string(),
365                        };
366
367                        // Atomic: reserve + hold_request + event in one transaction (PIP-001 §11b/§11c).
368                        let pool = self.state_store.pool();
369                        let mut tx = pool.begin().await?;
370                        if reserved_cost > 0 {
371                            self.energy_ledger
372                                .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
373                                .await?;
374                        }
375                        self.event_log
376                            .append_in_tx(&mut tx, &mut hold_event)
377                            .await?;
378                        self.envelope_store
379                            .create_hold_request_in_tx(
380                                &mut tx,
381                                &NewHoldRequest {
382                                    hold_id: &hold_id,
383                                    envelope_id: &env.envelope_id,
384                                    agent_id: &action.actor_id,
385                                    trigger_target: &action.target,
386                                    trigger_action: action.action_type.as_str(),
387                                    pending_payload: &json!({
388                                        "target": &action.target,
389                                        "action_type": action.action_type.as_str(),
390                                        "payload": &action.payload,
391                                        "reserved_cost": reserved_cost
392                                    }),
393                                },
394                            )
395                            .await?;
396                        // Envelope stays Active — agent can continue submitting (PIP-001 §11b).
397
398                        // Audit trail — atomic with event (whitepaper §3 invariant 5).
399                        let log_index = hold_event.log_index as u64;
400                        let tree_size = log_index + 1;
401                        self.audit_log
402                            .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
403                            .await
404                            .map_err(|e| KernelError::Audit(e.to_string()))?;
405                        self.audit_log
406                            .make_checkpoint_in_tx(&mut tx, tree_size)
407                            .await
408                            .map_err(|e| KernelError::Audit(e.to_string()))?;
409
410                        tx.commit().await?;
411
412                        return Err(KernelError::HoldTriggered {
413                            hold_id,
414                            agent_id: action.actor_id.clone(),
415                        });
416                    }
417                }
418            }
419        }
420
421        // PIP-001 §11d: Check for hold_response (approve/reject) operations.
422        // Pattern: mutate "ledger/hold/<hold_id>" with payload { decision, instruction? }
423        let hold_response = if matches!(action.action_type, ActionType::Mutate) {
424            parse_hold_response(&action.target, &action.payload)
425        } else {
426            None
427        };
428
429        // Validate and execute hold_response before entering the normal pipeline.
430        if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
431            return self
432                .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
433                .await;
434        }
435
436        // Phase 4a: Check for lifecycle operations (freeze/unfreeze/terminate).
437        let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
438            lifecycle::parse_lifecycle_op(&action.target, &action.payload)
439        } else {
440            None
441        };
442
443        // Validate lifecycle authorization before proceeding.
444        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
445            let initiator = self
446                .actor_store
447                .get(&action.actor_id)
448                .await?
449                .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
450            let target_actor = self
451                .actor_store
452                .get(target_actor_id)
453                .await?
454                .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
455            lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
456        }
457
458        let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
459        let create_envelope_spec =
460            parse_create_envelope_spec(&action, &self.envelope_store).await?;
461        let policy_version = parse_policy_version(&action);
462
463        // Step 2: QUOTE + Step 3: RESERVE
464        // PIP-001 §11d: Hold-approved actions already have energy reserved at trigger time.
465        // The sentinel `_hold_reserved_cost` carries the pre-reserved amount so we skip
466        // double-charging. We create a phantom EnergyReservation so settle correctly
467        // reduces reserved_energy without calling reserve() again.
468        let (reserved_cost, reservation) = if let Some(hold_reserved) = action
469            .payload
470            .get("_hold_reserved_cost")
471            .and_then(|v| v.as_i64())
472        {
473            let phantom = if hold_reserved > 0 {
474                Some(EnergyReservation {
475                    actor_id: action.actor_id.clone(),
476                    reserved: hold_reserved,
477                })
478            } else {
479                None
480            };
481            (hold_reserved, phantom)
482        } else {
483            // All actions pay quote_cost (action_cost + append_cost).
484            // For observe: action_cost=0 but append_cost >= 1 (Landauer).
485            let cost = quote_cost(&action) as i64;
486            let res = if cost > 0 {
487                Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
488            } else {
489                None
490            };
491            (cost, res)
492        };
493
494        // Step 4: VALIDATE EXECUTE PAYLOAD (PIP-002 §2 — for Execute actions only)
495        // The kernel does not execute anything. It validates the actor-submitted
496        // result and records it. Actor executes, kernel records.
497        let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
498            Some(Self::validate_execute_payload(&action.payload)?)
499        } else {
500            None
501        };
502
503        // Step 5: SETTLE
504        // For PIP-002: settled cost equals reserved cost (no post-execution IO adjustment).
505        let settled_cost = reserved_cost;
506
507        // Step 6: APPEND
508        let mut event = EventRecord {
509            id: Uuid::new_v4().to_string(),
510            log_index: 0,
511            event_hash: String::new(),
512            actor_id: action.actor_id.clone(),
513            action_type: action.action_type.as_str().to_string(),
514            target: action.target.clone(),
515            payload: action.payload.clone(),
516            payload_hash: payload_hash_hex(&action)?,
517            artifact_hash: artifact_hash.clone(),
518            reserved_energy: reserved_cost,
519            settled_energy: settled_cost,
520            timestamp: now_millis_string(),
521        };
522        self.finalize_energy_and_event(
523            reservation.as_ref(),
524            settled_cost,
525            create_actor_spec.as_ref(),
526            create_envelope_spec.as_ref(),
527            &mut event,
528        )
529        .await?;
530
531        // Step 7: POST-COMMIT — envelope budget consumption + checkpoints + policy + lifecycle
532        if let Some(ref version) = policy_version {
533            if let Err(err) = self.state_store.set_policy_version(version).await {
534                warn!(error = %err, "failed to record policy version after commit");
535            } else {
536                info!(policy_version = %version, event_id = %event.id, "policy version updated");
537            }
538        }
539
540        // Phase 4b: Post-commit envelope budget consumption and checkpoint (PIP-001 §11).
541        if action.action_type.is_state_changing()
542            && settled_cost > 0
543            && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
544            && actor.actor_type == ActorType::Agent
545            && let Ok(Some(envelope)) = self
546                .envelope_store
547                .get_active_for_actor(&action.actor_id)
548                .await
549        {
550            // Check checkpoint BEFORE consuming (so we use pre-consume state).
551            let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
552
553            // Consume budget.
554            let pool = self.state_store.pool();
555            let mut tx = pool.begin().await?;
556            match self
557                .envelope_store
558                .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
559                .await
560            {
561                Ok(_new_consumed) => {
562                    // Handle checkpoint levels (PIP-001 §11).
563                    match checkpoint {
564                        Some(CheckpointLevel::Halt) => {
565                            self.envelope_store
566                                .set_status_in_tx(
567                                    &mut tx,
568                                    &envelope.envelope_id,
569                                    &consent::EnvelopeStatus::Expired,
570                                )
571                                .await?;
572                            info!(
573                                envelope_id = %envelope.envelope_id,
574                                actor_id = %action.actor_id,
575                                "checkpoint: HALT — envelope budget exhausted"
576                            );
577                        }
578                        Some(CheckpointLevel::Report) => {
579                            info!(
580                                envelope_id = %envelope.envelope_id,
581                                actor_id = %action.actor_id,
582                                budget = envelope.budget,
583                                consumed = envelope.budget_consumed + settled_cost,
584                                "checkpoint: REPORT — summary logged"
585                            );
586                        }
587                        None => {}
588                    }
589                    tx.commit().await?;
590                }
591                Err(err) => {
592                    warn!(
593                        error = %err,
594                        "envelope budget consumption failed (event already committed)"
595                    );
596                    // Transaction is automatically rolled back on drop.
597                }
598            }
599        }
600
601        // Phase 4a: Execute lifecycle operation after event is committed.
602        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
603            let pool = self.state_store.pool();
604            match op {
605                punkgo_core::actor::LifecycleOp::Freeze { .. } => {
606                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
607                    {
608                        Ok(frozen_ids) => {
609                            info!(
610                                target = %target_actor_id,
611                                cascade_count = frozen_ids.len(),
612                                "lifecycle: freeze executed"
613                            );
614                        }
615                        Err(err) => {
616                            warn!(error = %err, "lifecycle: freeze failed (event already committed)");
617                        }
618                    }
619                }
620                punkgo_core::actor::LifecycleOp::Unfreeze => {
621                    match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
622                        .await
623                    {
624                        Ok(()) => {
625                            info!(target = %target_actor_id, "lifecycle: unfreeze executed");
626                        }
627                        Err(err) => {
628                            warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
629                        }
630                    }
631                }
632                punkgo_core::actor::LifecycleOp::Terminate { .. } => {
633                    // Terminate sets the actor to frozen permanently.
634                    // Orphan handling is a future enhancement — for now, children are cascade-frozen.
635                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
636                    {
637                        Ok(frozen_ids) => {
638                            info!(
639                                target = %target_actor_id,
640                                cascade_count = frozen_ids.len(),
641                                "lifecycle: terminate executed (cascade frozen)"
642                            );
643                        }
644                        Err(err) => {
645                            warn!(error = %err, "lifecycle: terminate failed (event already committed)");
646                        }
647                    }
648                }
649            }
650        }
651
652        Ok(SubmitReceipt {
653            event_id: event.id,
654            log_index: event.log_index,
655            event_hash: event.event_hash,
656            reserved_cost,
657            settled_cost,
658            artifact_hash,
659        })
660    }
661
662    async fn finalize_energy_and_event(
663        &self,
664        reservation: Option<&EnergyReservation>,
665        settled_cost: i64,
666        create_actor: Option<&CreateActorSpec>,
667        create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
668        event: &mut EventRecord,
669    ) -> KernelResult<()> {
670        let pool = self.state_store.pool();
671        let mut tx = pool.begin().await?;
672
673        // Energy settlement (if reservation exists).
674        if let Some(res) = reservation {
675            self.energy_ledger
676                .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
677                .await?;
678        }
679
680        // Actor creation (if applicable).
681        if let Some(spec) = create_actor {
682            self.energy_ledger
683                .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
684                .await?;
685            self.actor_store.create_in_tx(&mut tx, spec).await?;
686            info!(
687                created_actor = %spec.actor_id,
688                actor_type = spec.actor_type.as_str(),
689                energy_balance = spec.energy_balance,
690                "actor created in transaction"
691            );
692        }
693
694        // Envelope creation (if applicable, whitepaper §3 invariant 6).
695        if let Some((envelope_id, spec, parent_id)) = create_envelope {
696            self.envelope_store
697                .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
698                .await?;
699            info!(
700                envelope_id = %envelope_id,
701                actor_id = %spec.actor_id,
702                grantor_id = %spec.grantor_id,
703                budget = spec.budget,
704                "envelope created in transaction"
705            );
706        }
707
708        // Event append.
709        self.event_log.append_in_tx(&mut tx, event).await?;
710
711        // Audit trail update — atomic with event (whitepaper §3 invariant 5).
712        let log_index = event.log_index as u64;
713        let tree_size = log_index + 1;
714        self.audit_log
715            .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
716            .await
717            .map_err(|e| KernelError::Audit(e.to_string()))?;
718        self.audit_log
719            .make_checkpoint_in_tx(&mut tx, tree_size)
720            .await
721            .map_err(|e| KernelError::Audit(e.to_string()))?;
722
723        tx.commit().await?;
724        info!(event_id = %event.id, log_index = event.log_index, "event committed");
725        Ok(())
726    }
727
728    /// PIP-002 §2+§3: Validate execute action payload.
729    ///
730    /// The kernel MUST reject an execute submission that is missing any required
731    /// field or uses an invalid OID format. Returns the artifact_hash on success.
732    fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
733        Self::require_valid_oid(payload, "input_oid")?;
734        Self::require_valid_oid(payload, "output_oid")?;
735
736        if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
737            return Err(KernelError::ExecutePayloadInvalid(
738                "missing or invalid exit_code (must be integer)".to_string(),
739            ));
740        }
741
742        let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
743        Ok(artifact_hash)
744    }
745
746    /// Validate that a payload field exists and matches OID format: `sha256:<64 hex chars>`.
747    fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
748        let val = payload
749            .get(field)
750            .and_then(|v| v.as_str())
751            .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
752
753        if !val.starts_with("sha256:") || val.len() != 71 {
754            return Err(KernelError::ExecutePayloadInvalid(format!(
755                "{field} must be sha256:<64 hex chars>, got: {val}"
756            )));
757        }
758        let hex_part = &val[7..];
759        if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
760            return Err(KernelError::ExecutePayloadInvalid(format!(
761                "{field} contains non-hex characters: {val}"
762            )));
763        }
764
765        Ok(val.to_string())
766    }
767
768    async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
769        // Visibility boundary gate for read queries.
770        // Currently all-transparent. When multi-user collaboration is
771        // introduced, check_read_access will enforce readable scope per
772        // requester. See PIP-001 §8 (writable_targets).
773        let requester = query.requester_id.as_deref().unwrap_or("anonymous");
774        check_read_access(requester, &query.kind)?;
775
776        match query.kind.as_str() {
777            "health" => Ok(json!({ "status": "ok" })),
778            "actor_energy" => {
779                let actor_id = query.actor_id.ok_or_else(|| {
780                    KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
781                })?;
782                let (energy_balance, reserved_energy) =
783                    self.energy_ledger.balance_view(&actor_id).await?;
784                Ok(json!({
785                    "actor_id": actor_id,
786                    "energy_balance": energy_balance,
787                    "reserved_energy": reserved_energy
788                }))
789            }
790            "events" => {
791                let limit = query.limit.unwrap_or(20).clamp(1, 500);
792                let events = self
793                    .event_log
794                    .query(
795                        query.actor_id.as_deref(),
796                        query.before_index,
797                        query.after_index,
798                        limit,
799                    )
800                    .await?;
801                // Pagination metadata: smallest log_index in result set.
802                // Client passes this as `before_index` to fetch the next page.
803                let next_cursor = events.last().map(|e| e.log_index);
804                let has_more = events.len() as i64 == limit;
805                Ok(json!({
806                    "events": events,
807                    "has_more": has_more,
808                    "next_cursor": next_cursor
809                }))
810            }
811            "stats" => {
812                let event_count = self.event_log.count().await?;
813                Ok(json!({ "event_count": event_count }))
814            }
815            "snapshot" => {
816                // Legacy: snapshot is superseded by audit checkpoint.
817                // Return audit checkpoint data for backward compatibility.
818                let event_count = self.event_log.count().await?;
819                let cp = self
820                    .audit_log
821                    .latest_checkpoint()
822                    .await
823                    .map_err(|e| KernelError::Audit(e.to_string()))?;
824                Ok(json!({
825                    "event_count": event_count,
826                    "snapshot_hash": cp.root_hash,
827                    "generated_at": cp.created_at
828                }))
829            }
830            "paths" => {
831                let paths = self.state_store.paths();
832                Ok(json!({
833                    "root": paths.root.display().to_string(),
834                    "workspace_root": paths.workspace_root.display().to_string(),
835                    "quarantine_root": paths.quarantine_root.display().to_string(),
836                    "db_path": paths.db_path.display().to_string()
837                }))
838            }
839            "audit_checkpoint" => {
840                let cp = self
841                    .audit_log
842                    .latest_checkpoint()
843                    .await
844                    .map_err(|e| KernelError::Audit(e.to_string()))?;
845                Ok(serde_json::to_value(cp)?)
846            }
847            "audit_inclusion_proof" => {
848                let log_index = query.log_index.ok_or_else(|| {
849                    KernelError::InvalidRequest(
850                        "log_index is required for audit_inclusion_proof".to_string(),
851                    )
852                })? as u64;
853                let tree_size = match query.tree_size {
854                    Some(s) => s as u64,
855                    None => self
856                        .audit_log
857                        .tree_size()
858                        .await
859                        .map_err(|e| KernelError::Audit(e.to_string()))?,
860                };
861                let proof = self
862                    .audit_log
863                    .inclusion_proof(log_index, tree_size)
864                    .await
865                    .map_err(|e| KernelError::Audit(e.to_string()))?;
866                Ok(json!({
867                    "log_index": log_index,
868                    "tree_size": tree_size,
869                    "proof": proof
870                }))
871            }
872            "audit_consistency_proof" => {
873                let old_size = query.old_size.ok_or_else(|| {
874                    KernelError::InvalidRequest(
875                        "old_size is required for audit_consistency_proof".to_string(),
876                    )
877                })? as u64;
878                let new_size = query.tree_size.ok_or_else(|| {
879                    KernelError::InvalidRequest(
880                        "tree_size is required for audit_consistency_proof".to_string(),
881                    )
882                })? as u64;
883                let proof = self
884                    .audit_log
885                    .consistency_proof(old_size, new_size)
886                    .await
887                    .map_err(|e| KernelError::Audit(e.to_string()))?;
888                Ok(json!({
889                    "old_size": old_size,
890                    "new_size": new_size,
891                    "proof": proof
892                }))
893            }
894            // Phase 2: stellar configuration read query.
895            "stellar_info" => {
896                let config = &self.stellar_config;
897                Ok(json!({
898                    "gpu_model": config.gpu_model,
899                    "cpu_model": config.cpu_model,
900                    "int8_tops": config.int8_tops,
901                    "energy_per_tick": config.effective_energy_per_tick(),
902                    "tick_interval_ms": config.tick_interval_ms,
903                    "luminosity_source": config.luminosity_source
904                }))
905            }
906            // Phase 4b: envelope_info read query — retrieve active envelope for an actor.
907            "envelope_info" => {
908                let actor_id = query.actor_id.ok_or_else(|| {
909                    KernelError::InvalidRequest(
910                        "actor_id is required for envelope_info".to_string(),
911                    )
912                })?;
913                let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
914                match envelope {
915                    Some(record) => Ok(serde_json::to_value(record)?),
916                    None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
917                }
918            }
919            // Phase 1: actor_info read query — retrieve actor record.
920            "actor_info" => {
921                let actor_id = query.actor_id.ok_or_else(|| {
922                    KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
923                })?;
924                let actor = self.actor_store.get(&actor_id).await?;
925                match actor {
926                    Some(record) => Ok(serde_json::to_value(record)?),
927                    None => Err(KernelError::ActorNotFound(actor_id)),
928                }
929            }
930            // PIP-001 §11: hold_info — retrieve a single hold_request by hold_id.
931            "hold_info" => {
932                let hold_id = query.hold_id.ok_or_else(|| {
933                    KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
934                })?;
935                let hold = self.envelope_store.get_hold_request(&hold_id).await?;
936                match hold {
937                    Some(record) => Ok(record),
938                    None => Err(KernelError::InvalidRequest(format!(
939                        "hold_request not found: {hold_id}"
940                    ))),
941                }
942            }
943            // PIP-001 §11: holds_pending — list pending hold_requests, optionally filtered by agent.
944            "holds_pending" => {
945                let holds = self
946                    .envelope_store
947                    .list_pending_holds(query.actor_id.as_deref())
948                    .await?;
949                Ok(json!({ "holds": holds }))
950            }
951            other => Err(KernelError::InvalidRequest(format!(
952                "unsupported read query kind: {other}"
953            ))),
954        }
955    }
956}
957
958fn now_millis_string() -> String {
959    let now = std::time::SystemTime::now()
960        .duration_since(std::time::UNIX_EPOCH)
961        .unwrap_or_default();
962    now.as_millis().to_string()
963}
964
965fn now_millis_u64() -> u64 {
966    std::time::SystemTime::now()
967        .duration_since(std::time::UNIX_EPOCH)
968        .unwrap_or_default()
969        .as_millis() as u64
970}
971
972/// If the action is a `system/policy` Create, extract the policy version string.
973/// Returns `None` for all other actions.
974fn parse_policy_version(action: &Action) -> Option<String> {
975    if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
976        return None;
977    }
978    action
979        .payload
980        .get("version")
981        .and_then(|v| v.as_str())
982        .map(|s| s.to_string())
983}
984
985/// Phase 1: Parse actor creation spec from a `ledger/actor` Create action.
986///
987/// Backward compatible: old format `{actor_id, energy_balance}` is supported
988/// by defaulting to actor_type=agent, purpose="legacy", writable_targets=[].
989///
990/// New format adds: actor_type, purpose, writable_targets, energy_share.
991async fn parse_create_actor_spec(
992    action: &Action,
993    actor_store: &ActorStore,
994) -> KernelResult<Option<CreateActorSpec>> {
995    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
996        return Ok(None);
997    }
998
999    let payload = action.payload.as_object().ok_or_else(|| {
1000        KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1001    })?;
1002
1003    // Required: actor_id (or derive from purpose)
1004    let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1005    let purpose = payload
1006        .get("purpose")
1007        .and_then(Value::as_str)
1008        .unwrap_or("legacy");
1009    let energy_balance = payload
1010        .get("energy_balance")
1011        .and_then(Value::as_i64)
1012        .unwrap_or(1000);
1013
1014    // Actor type: default to "agent" for backward compat
1015    let actor_type_str = payload
1016        .get("actor_type")
1017        .and_then(Value::as_str)
1018        .unwrap_or("agent");
1019    let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1020        KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1021    })?;
1022
1023    // Derive ID if not explicitly provided
1024    let actor_id = if let Some(id) = explicit_id {
1025        id.to_string()
1026    } else {
1027        if purpose == "legacy" {
1028            return Err(KernelError::InvalidRequest(
1029                "actor_id or purpose is required to create an actor".to_string(),
1030            ));
1031        }
1032        let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1033        derive_agent_id(&action.actor_id, purpose, seq)
1034    };
1035
1036    // Build lineage from creator (PIP-001 §7) + enforce §5 (only humans create agents)
1037    let creator_record = actor_store.get(&action.actor_id).await?;
1038    let (creator_type, creator_lineage) = match &creator_record {
1039        Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1040        // Fallback for legacy actors not yet in actors table
1041        None => (ActorType::Human, vec![]),
1042    };
1043
1044    // PIP-001 §5: Only Humans can create Agents. Reject Agent as creator.
1045    if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1046        return Err(KernelError::PolicyViolation(
1047            "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1048                .to_string(),
1049        ));
1050    }
1051
1052    let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1053
1054    // Writable targets: parse from payload or default to empty
1055    let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1056        payload.get("writable_targets")
1057    {
1058        serde_json::from_value(targets_val.clone())
1059            .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1060    } else {
1061        vec![]
1062    };
1063
1064    // Phase 3 PIP-001 §11: validate child targets are subset of creator's boundary.
1065    if !writable_targets.is_empty()
1066        && let Some(ref creator) = creator_record
1067    {
1068        validate_child_targets(creator, &writable_targets)?;
1069    }
1070
1071    // Energy share: default 0.0
1072    let energy_share = payload
1073        .get("energy_share")
1074        .and_then(Value::as_f64)
1075        .unwrap_or(0.0);
1076
1077    let reduction_policy = payload
1078        .get("reduction_policy")
1079        .and_then(Value::as_str)
1080        .unwrap_or("none")
1081        .to_string();
1082
1083    Ok(Some(CreateActorSpec {
1084        actor_id,
1085        actor_type,
1086        creator_id: action.actor_id.clone(),
1087        lineage,
1088        purpose: Some(purpose.to_string()),
1089        writable_targets,
1090        energy_balance,
1091        energy_share,
1092        reduction_policy,
1093    }))
1094}
1095
1096/// Phase 4b: Parse envelope creation spec from a `ledger/envelope` Create action.
1097///
1098/// Returns (envelope_id, EnvelopeSpec, parent_envelope_id) if applicable.
1099/// The grantor is always the action's actor_id (the human or parent agent granting
1100/// the envelope).
1101///
1102/// PIP-001 §11: If the grantor is an agent, their active envelope is used as the parent
1103/// and reduction validation is performed.
1104async fn parse_create_envelope_spec(
1105    action: &Action,
1106    envelope_store: &EnvelopeStore,
1107) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1108    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1109        return Ok(None);
1110    }
1111
1112    let payload = action.payload.as_object().ok_or_else(|| {
1113        KernelError::InvalidRequest("envelope payload must be an object".to_string())
1114    })?;
1115
1116    let actor_id = payload
1117        .get("actor_id")
1118        .and_then(Value::as_str)
1119        .ok_or_else(|| {
1120            KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1121        })?
1122        .to_string();
1123
1124    let budget = payload
1125        .get("budget")
1126        .and_then(Value::as_i64)
1127        .ok_or_else(|| {
1128            KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1129        })?;
1130
1131    if budget <= 0 {
1132        return Err(KernelError::InvalidRequest(
1133            "envelope budget must be positive".to_string(),
1134        ));
1135    }
1136
1137    let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1138        serde_json::from_value(targets_val.clone())
1139            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1140    } else {
1141        return Err(KernelError::InvalidRequest(
1142            "targets are required to create an envelope".to_string(),
1143        ));
1144    };
1145
1146    let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1147        serde_json::from_value(actions_val.clone())
1148            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1149    } else {
1150        return Err(KernelError::InvalidRequest(
1151            "actions are required to create an envelope".to_string(),
1152        ));
1153    };
1154
1155    let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1156    let report_every = payload.get("report_every").and_then(Value::as_i64);
1157    let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1158
1159    // PIP-001 §11a: Parse hold_on rules.
1160    let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1161        serde_json::from_value(hold_on_val.clone())
1162            .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1163    } else {
1164        vec![]
1165    };
1166
1167    let spec = EnvelopeSpec {
1168        actor_id,
1169        grantor_id: action.actor_id.clone(),
1170        budget,
1171        targets,
1172        actions,
1173        duration_secs,
1174        report_every,
1175        hold_on,
1176        hold_timeout_secs,
1177    };
1178
1179    // PIP-001 §11: If grantor is an agent with an active envelope, validate reduction.
1180    let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1181        .get_active_for_actor(&action.actor_id)
1182        .await?
1183    {
1184        consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1185        Some(grantor_envelope.envelope_id)
1186    } else {
1187        None
1188    };
1189
1190    let envelope_id = Uuid::new_v4().to_string();
1191
1192    Ok(Some((envelope_id, spec, parent_envelope_id)))
1193}
1194
1195// ---------------------------------------------------------------------------
1196// PIP-001 §11d: Hold response parsing and execution
1197// ---------------------------------------------------------------------------
1198
1199/// Parse a `ledger/hold/<hold_id>` mutate target into (hold_id, decision, instruction).
1200///
1201/// Returns `None` for any other target.
1202fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1203    // Pattern: "ledger/hold/<hold_id>"
1204    let rest = target.strip_prefix("ledger/hold/")?;
1205    // Ensure there is a non-empty hold_id segment and nothing after it.
1206    if rest.is_empty() || rest.contains('/') {
1207        return None;
1208    }
1209    let hold_id = rest.to_string();
1210
1211    let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1212
1213    // Validate decision value.
1214    if decision != "approve" && decision != "reject" {
1215        return None;
1216    }
1217
1218    let instruction = payload
1219        .get("instruction")
1220        .and_then(Value::as_str)
1221        .map(|s| s.to_string());
1222
1223    Some((hold_id, decision, instruction))
1224}
1225
1226impl Kernel {
1227    /// PIP-001 §11d: Execute a hold_response (approve or reject).
1228    ///
1229    /// Validates that:
1230    /// 1. The caller is a Human actor (whitepaper §2: Human sovereignty).
1231    /// 2. The hold_request exists and is still pending.
1232    /// 3. The hold envelope belongs to an Agent actor.
1233    ///
1234    /// On approve: re-executes the pending action and writes a `hold_response` event.
1235    /// On reject: discards the pending action and writes a `hold_response` event.
1236    async fn execute_hold_response(
1237        &self,
1238        human_action: &Action,
1239        hold_id: &str,
1240        decision: &str,
1241        instruction: Option<&str>,
1242    ) -> KernelResult<SubmitReceipt> {
1243        // --- Validate caller is Human ---
1244        let caller = self
1245            .actor_store
1246            .get(&human_action.actor_id)
1247            .await?
1248            .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1249
1250        if caller.actor_type != punkgo_core::actor::ActorType::Human {
1251            return Err(KernelError::PolicyViolation(format!(
1252                "only Human actors can resolve holds; caller {} is {:?}",
1253                human_action.actor_id, caller.actor_type
1254            )));
1255        }
1256
1257        // --- Load hold_request ---
1258        let hold_record = self
1259            .envelope_store
1260            .get_hold_request(hold_id)
1261            .await?
1262            .ok_or_else(|| {
1263                KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1264            })?;
1265
1266        let _envelope_id = hold_record
1267            .get("envelope_id")
1268            .and_then(Value::as_str)
1269            .ok_or_else(|| {
1270                KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1271            })?
1272            .to_string();
1273
1274        let agent_id = hold_record
1275            .get("agent_id")
1276            .and_then(Value::as_str)
1277            .ok_or_else(|| {
1278                KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1279            })?
1280            .to_string();
1281
1282        let pending_payload = hold_record
1283            .get("pending_payload")
1284            .cloned()
1285            .unwrap_or(Value::Null);
1286
1287        let trigger_target = hold_record
1288            .get("trigger_target")
1289            .and_then(Value::as_str)
1290            .unwrap_or("")
1291            .to_string();
1292
1293        let trigger_action = hold_record
1294            .get("trigger_action")
1295            .and_then(Value::as_str)
1296            .unwrap_or("")
1297            .to_string();
1298
1299        let status = hold_record
1300            .get("status")
1301            .and_then(Value::as_str)
1302            .unwrap_or("");
1303        if status != "pending" {
1304            return Err(KernelError::PolicyViolation(format!(
1305                "hold_request {hold_id} is already resolved (status={status})"
1306            )));
1307        }
1308
1309        // PIP-001 §11e: Check if this hold has already timed out.
1310        // If so, run the lazy expiry first — it will auto-reject the hold.
1311        let envelope_id_str = hold_record
1312            .get("envelope_id")
1313            .and_then(Value::as_str)
1314            .unwrap_or("")
1315            .to_string();
1316        if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1317            if let Some(timeout_secs) = env.hold_timeout_secs {
1318                if timeout_secs > 0 {
1319                    let triggered_at: u64 = hold_record
1320                        .get("triggered_at")
1321                        .and_then(|v| v.as_str())
1322                        .and_then(|s| s.parse().ok())
1323                        .unwrap_or(0);
1324                    let now = now_millis_u64();
1325                    if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1326                        // Expire this hold first, then return error.
1327                        self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1328                            .await?;
1329                        return Err(KernelError::PolicyViolation(format!(
1330                            "hold_request {hold_id} has timed out and was auto-rejected"
1331                        )));
1332                    }
1333                }
1334            }
1335        }
1336
1337        // --- Build hold_response event payload (PIP-001 §11d) ---
1338        let response_payload = json!({
1339            "hold_id": hold_id,
1340            "agent_id": &agent_id,
1341            "decision": decision,
1342            "instruction": instruction,
1343            "trigger": {
1344                "target": &trigger_target,
1345                "action_type": &trigger_action
1346            },
1347            "resolved_by": &human_action.actor_id,
1348            "resolved_at": now_millis_string()
1349        });
1350
1351        let response_action = Action {
1352            actor_id: human_action.actor_id.clone(),
1353            action_type: ActionType::Mutate,
1354            target: format!("ledger/hold/{hold_id}"),
1355            payload: response_payload.clone(),
1356            timestamp: None,
1357        };
1358
1359        // --- Atomic transaction: resolve hold_request + settle energy + write event ---
1360        // Envelope stays Active throughout — no status change needed (PIP-001 §11b).
1361        let pool = self.state_store.pool();
1362        let mut tx = pool.begin().await?;
1363
1364        self.envelope_store
1365            .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1366            .await?;
1367
1368        // PIP-001 §11f: On reject, settle commitment cost (20% of reserved).
1369        // settle_in_tx(reserved=full, actual=commitment) releases the rest.
1370        let hold_reserved_cost = pending_payload
1371            .get("reserved_cost")
1372            .and_then(|v| v.as_i64())
1373            .unwrap_or(0);
1374
1375        let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1376            let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1377            self.energy_ledger
1378                .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1379                .await?;
1380            cost
1381        } else {
1382            0
1383        };
1384
1385        let mut response_event = EventRecord {
1386            id: Uuid::new_v4().to_string(),
1387            log_index: 0,
1388            event_hash: String::new(),
1389            actor_id: human_action.actor_id.clone(),
1390            action_type: "hold_response".to_string(),
1391            target: format!("ledger/hold/{hold_id}"),
1392            payload: response_payload.clone(),
1393            payload_hash: payload_hash_hex(&response_action)?,
1394            artifact_hash: None,
1395            reserved_energy: hold_reserved_cost,
1396            settled_energy: commitment_cost,
1397            timestamp: now_millis_string(),
1398        };
1399        self.event_log
1400            .append_in_tx(&mut tx, &mut response_event)
1401            .await?;
1402
1403        // Audit trail — atomic with event (whitepaper §3 invariant 5).
1404        let log_index = response_event.log_index as u64;
1405        let tree_size = log_index + 1;
1406        self.audit_log
1407            .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1408            .await
1409            .map_err(|e| KernelError::Audit(e.to_string()))?;
1410        self.audit_log
1411            .make_checkpoint_in_tx(&mut tx, tree_size)
1412            .await
1413            .map_err(|e| KernelError::Audit(e.to_string()))?;
1414
1415        tx.commit().await?;
1416
1417        info!(
1418            hold_id = %hold_id,
1419            decision = %decision,
1420            agent_id = %agent_id,
1421            resolved_by = %human_action.actor_id,
1422            "PIP-001 §11d: hold resolved"
1423        );
1424
1425        // --- Approve: re-execute the pending action (PIP-001 §11d) ---
1426        if decision == "approve" {
1427            // Reconstruct the original action from the pending_payload stored in hold_request.
1428            let orig_target = pending_payload
1429                .get("target")
1430                .and_then(Value::as_str)
1431                .unwrap_or(&trigger_target)
1432                .to_string();
1433            let orig_action_type_str = pending_payload
1434                .get("action_type")
1435                .and_then(Value::as_str)
1436                .unwrap_or(&trigger_action)
1437                .to_string();
1438            let mut orig_payload = pending_payload
1439                .get("payload")
1440                .cloned()
1441                .unwrap_or(Value::Null);
1442
1443            // PIP-001 §11d: Energy was already reserved at hold trigger time.
1444            // `reserved_cost` was stored in pending_payload by the hold trigger.
1445            // Inject `_hold_reserved_cost` so submit_action skips double quote/reserve.
1446
1447            // Mark the payload as hold-approved so the re-execution bypasses the hold check (PIP-001 §11d).
1448            // This sentinel prevents a re-trigger loop.
1449            if let Some(obj) = orig_payload.as_object_mut() {
1450                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1451                obj.insert(
1452                    "_hold_reserved_cost".to_string(),
1453                    Value::Number(hold_reserved_cost.into()),
1454                );
1455                // PIP-001 §11d: If instruction is present, append it to the payload.
1456                if let Some(instr) = instruction {
1457                    obj.insert(
1458                        "_hold_instruction".to_string(),
1459                        Value::String(instr.to_string()),
1460                    );
1461                }
1462            } else {
1463                // If payload is not an object (e.g. null), wrap it.
1464                let mut obj = serde_json::Map::new();
1465                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1466                obj.insert(
1467                    "_hold_reserved_cost".to_string(),
1468                    Value::Number(hold_reserved_cost.into()),
1469                );
1470                if let Some(instr) = instruction {
1471                    obj.insert(
1472                        "_hold_instruction".to_string(),
1473                        Value::String(instr.to_string()),
1474                    );
1475                }
1476                orig_payload = Value::Object(obj);
1477            }
1478
1479            let orig_action_type = match orig_action_type_str.as_str() {
1480                "observe" => ActionType::Observe,
1481                "create" => ActionType::Create,
1482                "mutate" => ActionType::Mutate,
1483                _ => ActionType::Execute,
1484            };
1485
1486            let pending_action = Action {
1487                actor_id: agent_id.clone(),
1488                action_type: orig_action_type,
1489                target: orig_target,
1490                payload: orig_payload,
1491                timestamp: None,
1492            };
1493
1494            info!(
1495                hold_id = %hold_id,
1496                agent_id = %agent_id,
1497                "PIP-001 §11d: re-executing approved pending action"
1498            );
1499
1500            // Re-submit the pending action on behalf of the agent.
1501            // This goes through the full pipeline (quota → reserve → execute → settle → append).
1502            // Use Box::pin to break the async recursion (submit_action → execute_hold_response → submit_action).
1503            return Box::pin(self.submit_action(pending_action)).await;
1504        }
1505
1506        // --- Reject: return a receipt with commitment cost (PIP-001 §11f) ---
1507        Ok(SubmitReceipt {
1508            event_id: response_event.id,
1509            log_index: response_event.log_index,
1510            event_hash: response_event.event_hash,
1511            reserved_cost: hold_reserved_cost,
1512            settled_cost: commitment_cost,
1513            artifact_hash: None,
1514        })
1515    }
1516
1517    /// PIP-001 §11e: Lazy expiry for timed-out holds.
1518    ///
1519    /// Called before hold-trigger checks in submit_action. Scans pending holds
1520    /// for the given envelope and auto-rejects any that have exceeded
1521    /// `hold_timeout_secs`. Same pattern as `is_envelope_expired()` — no
1522    /// background thread, just check-on-access.
1523    async fn expire_timed_out_holds(
1524        &self,
1525        envelope_id: &str,
1526        hold_timeout_secs: i64,
1527    ) -> KernelResult<()> {
1528        let holds = self
1529            .envelope_store
1530            .list_pending_holds_for_envelope(envelope_id)
1531            .await?;
1532        let now = now_millis_u64();
1533
1534        for hold in holds {
1535            let triggered_at: u64 = hold
1536                .get("triggered_at")
1537                .and_then(|v| v.as_str())
1538                .and_then(|s| s.parse().ok())
1539                .unwrap_or(0);
1540
1541            if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1542                continue;
1543            }
1544
1545            // Timed out — treat as reject (PIP-001 §11e/§11f).
1546            let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1547            let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1548            let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1549            let reserved_cost = pending_payload
1550                .get("reserved_cost")
1551                .and_then(|v| v.as_i64())
1552                .unwrap_or(0);
1553
1554            let commitment_cost = if reserved_cost > 0 {
1555                ((reserved_cost as f64) * 0.2).ceil() as i64
1556            } else {
1557                0
1558            };
1559
1560            let timeout_payload = json!({
1561                "hold_id": hold_id,
1562                "agent_id": agent_id,
1563                "decision": "timed_out",
1564                "reserved_cost": reserved_cost,
1565                "commitment_cost": commitment_cost,
1566                "triggered_at": triggered_at.to_string(),
1567                "expired_at": now.to_string()
1568            });
1569            let timeout_action = Action {
1570                actor_id: agent_id.to_string(),
1571                action_type: ActionType::Mutate,
1572                target: format!("ledger/hold/{hold_id}"),
1573                payload: timeout_payload.clone(),
1574                timestamp: None,
1575            };
1576            let mut timeout_event = EventRecord {
1577                id: Uuid::new_v4().to_string(),
1578                log_index: 0,
1579                event_hash: String::new(),
1580                actor_id: agent_id.to_string(),
1581                action_type: "hold_timeout".to_string(),
1582                target: format!("ledger/hold/{hold_id}"),
1583                payload: timeout_payload,
1584                payload_hash: payload_hash_hex(&timeout_action)?,
1585                artifact_hash: None,
1586                reserved_energy: reserved_cost,
1587                settled_energy: commitment_cost,
1588                timestamp: now_millis_string(),
1589            };
1590
1591            let pool = self.state_store.pool();
1592            let mut tx = pool.begin().await?;
1593            self.envelope_store
1594                .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1595                .await?;
1596            if reserved_cost > 0 {
1597                self.energy_ledger
1598                    .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1599                    .await?;
1600            }
1601            self.event_log
1602                .append_in_tx(&mut tx, &mut timeout_event)
1603                .await?;
1604
1605            // Audit trail — atomic with event (whitepaper §3 invariant 5).
1606            let t_log_index = timeout_event.log_index as u64;
1607            let t_tree_size = t_log_index + 1;
1608            self.audit_log
1609                .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1610                .await
1611                .map_err(|e| KernelError::Audit(e.to_string()))?;
1612            self.audit_log
1613                .make_checkpoint_in_tx(&mut tx, t_tree_size)
1614                .await
1615                .map_err(|e| KernelError::Audit(e.to_string()))?;
1616
1617            tx.commit().await?;
1618
1619            info!(
1620                hold_id = %hold_id,
1621                agent_id = %agent_id,
1622                commitment_cost = commitment_cost,
1623                "PIP-001 §11e: hold auto-expired (timeout)"
1624            );
1625        }
1626        Ok(())
1627    }
1628}