Skip to main content

punkgo_kernel/runtime/
kernel.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11    ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20    ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21    NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26/// Configuration for bootstrapping the kernel.
27#[derive(Debug, Clone)]
28pub struct KernelConfig {
29    pub state_dir: PathBuf,
30    pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34    fn default() -> Self {
35        let state_dir = std::env::var("PUNKGO_STATE_DIR")
36            .map(PathBuf::from)
37            .unwrap_or_else(|_| default_state_dir());
38        Self {
39            state_dir,
40            ipc_endpoint: "punkgo-kernel".to_string(),
41        }
42    }
43}
44
45/// Default state directory: ~/.punkgo/state
46///
47/// Falls back to `./state` if home directory cannot be determined.
48fn default_state_dir() -> PathBuf {
49    if let Some(home) = home_dir() {
50        return home.join(".punkgo").join("state");
51    }
52    PathBuf::from("state")
53}
54
55fn home_dir() -> Option<PathBuf> {
56    // Unix / WSL
57    if let Some(home) = std::env::var_os("HOME") {
58        return Some(PathBuf::from(home));
59    }
60    // Windows primary
61    if let Some(profile) = std::env::var_os("USERPROFILE") {
62        return Some(PathBuf::from(profile));
63    }
64    // Windows fallback
65    let drive = std::env::var_os("HOMEDRIVE")?;
66    let path = std::env::var_os("HOMEPATH")?;
67    let mut p = PathBuf::from(drive);
68    p.push(path);
69    Some(p)
70}
71
72/// Cryptographic receipt returned after a successful action submission.
73///
74/// Contains the event ID, log index, event hash, and energy costs.
75/// This is the caller's proof that their action was committed to the
76/// append-only history.
77#[derive(Debug, Clone, Serialize)]
78pub struct SubmitReceipt {
79    pub event_id: String,
80    pub log_index: i64,
81    pub event_hash: String,
82    pub reserved_cost: i64,
83    pub settled_cost: i64,
84    pub artifact_hash: Option<String>,
85}
86
87#[derive(Debug, Deserialize)]
88struct ReadQuery {
89    kind: String,
90    actor_id: Option<String>,
91    /// PIP-001 §11: hold_id for hold_info queries.
92    #[serde(default)]
93    hold_id: Option<String>,
94    limit: Option<i64>,
95    /// For audit_inclusion_proof: the event's log_index to prove.
96    log_index: Option<i64>,
97    /// For audit_inclusion_proof / audit_consistency_proof: the tree size to prove against.
98    tree_size: Option<i64>,
99    /// For audit_consistency_proof: the older tree size.
100    old_size: Option<i64>,
101    /// Pagination: only return events with log_index < this value.
102    /// Used for backward pagination (fetching older events).
103    #[serde(default)]
104    before_index: Option<i64>,
105    /// Pagination: only return events with log_index > this value.
106    /// Used for forward pagination (fetching newer events).
107    #[serde(default)]
108    after_index: Option<i64>,
109    /// Identity of the requester. Used by visibility boundary checks.
110    /// Currently optional — when absent, all reads are permitted (single-user).
111    /// When multi-user collaboration is introduced, this becomes required.
112    #[serde(default)]
113    requester_id: Option<String>,
114}
115
116/// The PunkGo kernel — single-writer append-only event system.
117///
118/// The kernel is the **committer** (whitepaper §2): a structural role that
119/// provides a single linearization point for actions. It is not a judge.
120///
121/// Use [`Kernel::bootstrap`] to initialize from a state directory, then
122/// [`Kernel::handle_request`] to process IPC requests.
123pub struct Kernel {
124    state_store: StateStore,
125    energy_ledger: EnergyLedger,
126    event_log: EventLog,
127    audit_log: AuditLog,
128    actor_store: ActorStore,
129    envelope_store: EnvelopeStore,
130    stellar_config: StellarConfig,
131}
132
133impl Kernel {
134    /// Initialize the kernel from a state directory.
135    ///
136    /// Creates the SQLite database, loads stellar config, and prepares the root actor.
137    pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
138        let state_store = StateStore::bootstrap(&config.state_dir).await?;
139        let energy_ledger = EnergyLedger::new(state_store.pool());
140        let event_log = EventLog::new(state_store.pool());
141        let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
142        let actor_store = ActorStore::new(state_store.pool());
143        let envelope_store = EnvelopeStore::new(state_store.pool());
144
145        // Phase 2: Load stellar configuration (PIP-001 §1).
146        let stellar_config_path = config.state_dir.join("stellar.toml");
147        let stellar_config = load_stellar_config(&stellar_config_path)?;
148        info!(
149            energy_per_tick = stellar_config.effective_energy_per_tick(),
150            int8_tops = stellar_config.int8_tops,
151            tick_interval_ms = stellar_config.tick_interval_ms,
152            "stellar configuration loaded"
153        );
154
155        Ok(Self {
156            state_store,
157            energy_ledger,
158            event_log,
159            audit_log,
160            actor_store,
161            envelope_store,
162            stellar_config,
163        })
164    }
165
166    /// Returns a reference to the stellar configuration for external use
167    /// (e.g., energy producer startup).
168    pub fn stellar_config(&self) -> &StellarConfig {
169        &self.stellar_config
170    }
171
172    /// Returns a clone of the energy ledger for external use (e.g., energy producer).
173    pub fn energy_ledger(&self) -> &EnergyLedger {
174        &self.energy_ledger
175    }
176
177    /// Returns a clone of the actor store for external use (e.g., energy producer).
178    pub fn actor_store(&self) -> &ActorStore {
179        &self.actor_store
180    }
181
182    /// Returns a reference to the envelope store for external use.
183    pub fn envelope_store(&self) -> &EnvelopeStore {
184        &self.envelope_store
185    }
186
187    /// Returns the SQLite pool for external use (e.g., energy producer transaction).
188    pub fn pool(&self) -> sqlx::SqlitePool {
189        self.state_store.pool()
190    }
191
192    /// Handle an incoming IPC request — dispatches to quote, submit, or read.
193    pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
194        let request_id = req.request_id.clone();
195        info!(
196            request_id = %request_id,
197            request_type = ?req.request_type,
198            "received request"
199        );
200        match self.dispatch(req).await {
201            Ok(payload) => {
202                info!(request_id = %request_id, "request completed");
203                ResponseEnvelope::ok(request_id, payload)
204            }
205            Err(err) => {
206                warn!(error = %err, "request failed");
207                ResponseEnvelope::err_structured(request_id, &err)
208            }
209        }
210    }
211
212    async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
213        match req.request_type {
214            RequestType::Quote => {
215                let action: Action = serde_json::from_value(req.payload)?;
216                validate_action(&action)?;
217                let cost = quote_cost(&action);
218                Ok(json!({ "cost": cost }))
219            }
220            RequestType::Submit => {
221                let action: Action = serde_json::from_value(req.payload)?;
222                let receipt = self.submit_action(action).await?;
223                Ok(serde_json::to_value(receipt)?)
224            }
225            RequestType::Read => {
226                let query: ReadQuery = serde_json::from_value(req.payload)?;
227                self.read_query(query).await
228            }
229        }
230    }
231
232    async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
233        // Step 1: VALIDATE — policy + actor existence + frozen check
234        validate_action(&action)?;
235        if !self.state_store.actor_exists(&action.actor_id).await? {
236            return Err(KernelError::ActorNotFound(action.actor_id.clone()));
237        }
238
239        // Phase 1: check frozen status via actors table.
240        // Phase 3: check writable boundary (PIP-001 §8/§9).
241        // Phase 4a: check lineage activity (PIP-001 §7) + lifecycle ops.
242        if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
243            if actor.status == punkgo_core::actor::ActorStatus::Frozen
244                && action.action_type.is_state_changing()
245            {
246                return Err(KernelError::ActorFrozen(format!(
247                    "actor {} is frozen and cannot perform state-changing actions",
248                    action.actor_id
249                )));
250            }
251            // Phase 3: Boundary enforcement — check writable_targets.
252            check_writable_boundary(&actor, &action.target, &action.action_type)?;
253
254            // Phase 4a: Check lineage activity for agents (PIP-001 §7).
255            // Agents need their entire creation chain to be active.
256            if actor.actor_type == punkgo_core::actor::ActorType::Agent
257                && action.action_type.is_state_changing()
258            {
259                lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
260            }
261
262            // Phase 4b: Authorization check (PIP-001 §5/§6/§11).
263            // Agents performing state-changing actions need an active envelope.
264            if action.action_type.is_state_changing() {
265                let envelope = self
266                    .envelope_store
267                    .get_active_for_actor(&action.actor_id)
268                    .await?;
269
270                // Lazy expiry check (PIP-001 §11 checkpoint): if envelope exists but is time-expired, mark it.
271                let envelope = if let Some(env) = envelope {
272                    if consent::is_envelope_expired(&env, now_millis_u64()) {
273                        self.envelope_store
274                            .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
275                            .await?;
276                        None
277                    } else {
278                        Some(env)
279                    }
280                } else {
281                    None
282                };
283
284                let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
285
286                // For ManOnTheLoop, also verify envelope covers this specific action.
287                if auth_mode == AuthorizationMode::ManOnTheLoop
288                    && let Some(ref env) = envelope
289                {
290                    consent::check_envelope_covers(
291                        env,
292                        &action.target,
293                        action.action_type.as_str(),
294                    )?;
295
296                    // PIP-001 §11e: Lazy-expire timed-out holds before checking new triggers.
297                    if !env.hold_on.is_empty() {
298                        if let Some(timeout_secs) = env.hold_timeout_secs {
299                            if timeout_secs > 0 {
300                                self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
301                                    .await?;
302                            }
303                        }
304                    }
305
306                    // PIP-001 §11b: Check hold_on rules — if triggered, create a hold_request
307                    // event and reserve energy before execution.
308                    // Skip the hold check if this action was already approved by a Human
309                    // (re-execution after approve; PIP-001 §11d).
310                    let hold_approved = action
311                        .payload
312                        .get("_hold_approved")
313                        .and_then(|v| v.as_bool())
314                        .unwrap_or(false);
315                    if !hold_approved
316                        && consent::check_hold_trigger(
317                            &env.hold_on,
318                            &action.target,
319                            action.action_type.as_str(),
320                        )
321                    {
322                        let hold_id = Uuid::new_v4().to_string();
323
324                        // PIP-001 §11c: Commit = reserve energy (gasLimit model).
325                        // Reserves energy upfront so agent can't overspend while hold is pending.
326                        let reserved_cost = quote_cost(&action) as i64;
327
328                        let hold_payload = json!({
329                            "hold_id": &hold_id,
330                            "agent_id": &action.actor_id,
331                            "trigger": {
332                                "target": &action.target,
333                                "action_type": action.action_type.as_str()
334                            },
335                            "pending_action": {
336                                "target": &action.target,
337                                "action_type": action.action_type.as_str(),
338                                "payload": &action.payload
339                            },
340                            "reserved_cost": reserved_cost,
341                            "triggered_at": now_millis_string()
342                        });
343
344                        // Write hold_request event into history (PIP-001 §11b, whitepaper §3 inv 6).
345                        let hold_action = Action {
346                            actor_id: action.actor_id.clone(),
347                            action_type: ActionType::Mutate,
348                            target: format!("ledger/hold/{hold_id}"),
349                            payload: hold_payload.clone(),
350                            timestamp: None,
351                        };
352                        let mut hold_event = EventRecord {
353                            id: Uuid::new_v4().to_string(),
354                            log_index: 0,
355                            event_hash: String::new(),
356                            actor_id: action.actor_id.clone(),
357                            action_type: "hold_request".to_string(),
358                            target: format!("ledger/hold/{hold_id}"),
359                            payload: hold_payload.clone(),
360                            payload_hash: payload_hash_hex(&hold_action)?,
361                            artifact_hash: None,
362                            reserved_energy: reserved_cost,
363                            settled_energy: 0,
364                            timestamp: now_millis_string(),
365                        };
366
367                        // Atomic: reserve + hold_request + event in one transaction (PIP-001 §11b/§11c).
368                        let pool = self.state_store.pool();
369                        let mut tx = pool.begin().await?;
370                        if reserved_cost > 0 {
371                            self.energy_ledger
372                                .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
373                                .await?;
374                        }
375                        self.event_log
376                            .append_in_tx(&mut tx, &mut hold_event)
377                            .await?;
378                        self.envelope_store
379                            .create_hold_request_in_tx(
380                                &mut tx,
381                                &NewHoldRequest {
382                                    hold_id: &hold_id,
383                                    envelope_id: &env.envelope_id,
384                                    agent_id: &action.actor_id,
385                                    trigger_target: &action.target,
386                                    trigger_action: action.action_type.as_str(),
387                                    pending_payload: &json!({
388                                        "target": &action.target,
389                                        "action_type": action.action_type.as_str(),
390                                        "payload": &action.payload,
391                                        "reserved_cost": reserved_cost
392                                    }),
393                                },
394                            )
395                            .await?;
396                        // Envelope stays Active — agent can continue submitting (PIP-001 §11b).
397
398                        // Audit trail — atomic with event (whitepaper §3 invariant 5).
399                        let log_index = hold_event.log_index as u64;
400                        self.audit_log
401                            .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
402                            .await
403                            .map_err(|e| KernelError::Audit(e.to_string()))?;
404                        // Checkpoint generated lazily on read, not here.
405                        tx.commit().await?;
406
407                        return Err(KernelError::HoldTriggered {
408                            hold_id,
409                            agent_id: action.actor_id.clone(),
410                        });
411                    }
412                }
413            }
414        }
415
416        // PIP-001 §11d: Check for hold_response (approve/reject) operations.
417        // Pattern: mutate "ledger/hold/<hold_id>" with payload { decision, instruction? }
418        let hold_response = if matches!(action.action_type, ActionType::Mutate) {
419            parse_hold_response(&action.target, &action.payload)
420        } else {
421            None
422        };
423
424        // Validate and execute hold_response before entering the normal pipeline.
425        if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
426            return self
427                .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
428                .await;
429        }
430
431        // Phase 4a: Check for lifecycle operations (freeze/unfreeze/terminate).
432        let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
433            lifecycle::parse_lifecycle_op(&action.target, &action.payload)
434        } else {
435            None
436        };
437
438        // Validate lifecycle authorization before proceeding.
439        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
440            let initiator = self
441                .actor_store
442                .get(&action.actor_id)
443                .await?
444                .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
445            let target_actor = self
446                .actor_store
447                .get(target_actor_id)
448                .await?
449                .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
450            lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
451        }
452
453        let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
454        let create_envelope_spec =
455            parse_create_envelope_spec(&action, &self.envelope_store).await?;
456        let policy_version = parse_policy_version(&action);
457
458        // Step 2: QUOTE + Step 3: RESERVE
459        // PIP-001 §11d: Hold-approved actions already have energy reserved at trigger time.
460        // The sentinel `_hold_reserved_cost` carries the pre-reserved amount so we skip
461        // double-charging. We create a phantom EnergyReservation so settle correctly
462        // reduces reserved_energy without calling reserve() again.
463        let (reserved_cost, reservation) = if let Some(hold_reserved) = action
464            .payload
465            .get("_hold_reserved_cost")
466            .and_then(|v| v.as_i64())
467        {
468            let phantom = if hold_reserved > 0 {
469                Some(EnergyReservation {
470                    actor_id: action.actor_id.clone(),
471                    reserved: hold_reserved,
472                })
473            } else {
474                None
475            };
476            (hold_reserved, phantom)
477        } else {
478            // All actions pay quote_cost (action_cost + append_cost).
479            // For observe: action_cost=0 but append_cost >= 1 (Landauer).
480            let cost = quote_cost(&action) as i64;
481            let res = if cost > 0 {
482                Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
483            } else {
484                None
485            };
486            (cost, res)
487        };
488
489        // Step 4: VALIDATE EXECUTE PAYLOAD (PIP-002 §2 — for Execute actions only)
490        // The kernel does not execute anything. It validates the actor-submitted
491        // result and records it. Actor executes, kernel records.
492        let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
493            Some(Self::validate_execute_payload(&action.payload)?)
494        } else {
495            None
496        };
497
498        // Step 5: SETTLE
499        // For PIP-002: settled cost equals reserved cost (no post-execution IO adjustment).
500        let settled_cost = reserved_cost;
501
502        // Step 6: APPEND
503        let mut event = EventRecord {
504            id: Uuid::new_v4().to_string(),
505            log_index: 0,
506            event_hash: String::new(),
507            actor_id: action.actor_id.clone(),
508            action_type: action.action_type.as_str().to_string(),
509            target: action.target.clone(),
510            payload: action.payload.clone(),
511            payload_hash: payload_hash_hex(&action)?,
512            artifact_hash: artifact_hash.clone(),
513            reserved_energy: reserved_cost,
514            settled_energy: settled_cost,
515            timestamp: now_millis_string(),
516        };
517        self.finalize_energy_and_event(
518            reservation.as_ref(),
519            settled_cost,
520            create_actor_spec.as_ref(),
521            create_envelope_spec.as_ref(),
522            &mut event,
523        )
524        .await?;
525
526        // Step 7: POST-COMMIT — envelope budget consumption + checkpoints + policy + lifecycle
527        if let Some(ref version) = policy_version {
528            if let Err(err) = self.state_store.set_policy_version(version).await {
529                warn!(error = %err, "failed to record policy version after commit");
530            } else {
531                info!(policy_version = %version, event_id = %event.id, "policy version updated");
532            }
533        }
534
535        // Phase 4b: Post-commit envelope budget consumption and checkpoint (PIP-001 §11).
536        if action.action_type.is_state_changing()
537            && settled_cost > 0
538            && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
539            && actor.actor_type == ActorType::Agent
540            && let Ok(Some(envelope)) = self
541                .envelope_store
542                .get_active_for_actor(&action.actor_id)
543                .await
544        {
545            // Check checkpoint BEFORE consuming (so we use pre-consume state).
546            let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
547
548            // Consume budget.
549            let pool = self.state_store.pool();
550            let mut tx = pool.begin().await?;
551            match self
552                .envelope_store
553                .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
554                .await
555            {
556                Ok(_new_consumed) => {
557                    // Handle checkpoint levels (PIP-001 §11).
558                    match checkpoint {
559                        Some(CheckpointLevel::Halt) => {
560                            self.envelope_store
561                                .set_status_in_tx(
562                                    &mut tx,
563                                    &envelope.envelope_id,
564                                    &consent::EnvelopeStatus::Expired,
565                                )
566                                .await?;
567                            info!(
568                                envelope_id = %envelope.envelope_id,
569                                actor_id = %action.actor_id,
570                                "checkpoint: HALT — envelope budget exhausted"
571                            );
572                        }
573                        Some(CheckpointLevel::Report) => {
574                            info!(
575                                envelope_id = %envelope.envelope_id,
576                                actor_id = %action.actor_id,
577                                budget = envelope.budget,
578                                consumed = envelope.budget_consumed + settled_cost,
579                                "checkpoint: REPORT — summary logged"
580                            );
581                        }
582                        None => {}
583                    }
584                    tx.commit().await?;
585                }
586                Err(err) => {
587                    warn!(
588                        error = %err,
589                        "envelope budget consumption failed (event already committed)"
590                    );
591                    // Transaction is automatically rolled back on drop.
592                }
593            }
594        }
595
596        // Phase 4a: Execute lifecycle operation after event is committed.
597        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
598            let pool = self.state_store.pool();
599            match op {
600                punkgo_core::actor::LifecycleOp::Freeze { .. } => {
601                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
602                    {
603                        Ok(frozen_ids) => {
604                            info!(
605                                target = %target_actor_id,
606                                cascade_count = frozen_ids.len(),
607                                "lifecycle: freeze executed"
608                            );
609                        }
610                        Err(err) => {
611                            warn!(error = %err, "lifecycle: freeze failed (event already committed)");
612                        }
613                    }
614                }
615                punkgo_core::actor::LifecycleOp::Unfreeze => {
616                    match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
617                        .await
618                    {
619                        Ok(()) => {
620                            info!(target = %target_actor_id, "lifecycle: unfreeze executed");
621                        }
622                        Err(err) => {
623                            warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
624                        }
625                    }
626                }
627                punkgo_core::actor::LifecycleOp::Terminate { .. } => {
628                    // Terminate sets the actor to frozen permanently.
629                    // Orphan handling is a future enhancement — for now, children are cascade-frozen.
630                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
631                    {
632                        Ok(frozen_ids) => {
633                            info!(
634                                target = %target_actor_id,
635                                cascade_count = frozen_ids.len(),
636                                "lifecycle: terminate executed (cascade frozen)"
637                            );
638                        }
639                        Err(err) => {
640                            warn!(error = %err, "lifecycle: terminate failed (event already committed)");
641                        }
642                    }
643                }
644            }
645        }
646
647        Ok(SubmitReceipt {
648            event_id: event.id,
649            log_index: event.log_index,
650            event_hash: event.event_hash,
651            reserved_cost,
652            settled_cost,
653            artifact_hash,
654        })
655    }
656
657    async fn finalize_energy_and_event(
658        &self,
659        reservation: Option<&EnergyReservation>,
660        settled_cost: i64,
661        create_actor: Option<&CreateActorSpec>,
662        create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
663        event: &mut EventRecord,
664    ) -> KernelResult<()> {
665        let pool = self.state_store.pool();
666        let mut tx = pool.begin().await?;
667
668        // Energy settlement (if reservation exists).
669        if let Some(res) = reservation {
670            self.energy_ledger
671                .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
672                .await?;
673        }
674
675        // Actor creation (if applicable).
676        if let Some(spec) = create_actor {
677            self.energy_ledger
678                .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
679                .await?;
680            self.actor_store.create_in_tx(&mut tx, spec).await?;
681            info!(
682                created_actor = %spec.actor_id,
683                actor_type = spec.actor_type.as_str(),
684                energy_balance = spec.energy_balance,
685                "actor created in transaction"
686            );
687        }
688
689        // Envelope creation (if applicable, whitepaper §3 invariant 6).
690        if let Some((envelope_id, spec, parent_id)) = create_envelope {
691            self.envelope_store
692                .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
693                .await?;
694            info!(
695                envelope_id = %envelope_id,
696                actor_id = %spec.actor_id,
697                grantor_id = %spec.grantor_id,
698                budget = spec.budget,
699                "envelope created in transaction"
700            );
701        }
702
703        // Event append.
704        self.event_log.append_in_tx(&mut tx, event).await?;
705
706        // Audit trail update — atomic with event (whitepaper §3 invariant 5).
707        let log_index = event.log_index as u64;
708        self.audit_log
709            .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
710            .await
711            .map_err(|e| KernelError::Audit(e.to_string()))?;
712
713        // Checkpoint is NOT generated on every event. It is a derived artifact
714        // (tree root computed from leaf hashes) and can be generated on-demand
715        // when queried (receipt, show, verify) or via explicit checkpoint command.
716        // This keeps the write path fast and lock-free for concurrent access.
717        tx.commit().await?;
718        info!(event_id = %event.id, log_index = event.log_index, "event committed");
719        Ok(())
720    }
721
722    /// PIP-002 §2+§3: Validate execute action payload.
723    ///
724    /// The kernel MUST reject an execute submission that is missing any required
725    /// field or uses an invalid OID format. Returns the artifact_hash on success.
726    fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
727        Self::require_valid_oid(payload, "input_oid")?;
728        Self::require_valid_oid(payload, "output_oid")?;
729
730        if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
731            return Err(KernelError::ExecutePayloadInvalid(
732                "missing or invalid exit_code (must be integer)".to_string(),
733            ));
734        }
735
736        let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
737        Ok(artifact_hash)
738    }
739
740    /// Validate that a payload field exists and matches OID format: `sha256:<64 hex chars>`.
741    fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
742        let val = payload
743            .get(field)
744            .and_then(|v| v.as_str())
745            .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
746
747        if !val.starts_with("sha256:") || val.len() != 71 {
748            return Err(KernelError::ExecutePayloadInvalid(format!(
749                "{field} must be sha256:<64 hex chars>, got: {val}"
750            )));
751        }
752        let hex_part = &val[7..];
753        if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
754            return Err(KernelError::ExecutePayloadInvalid(format!(
755                "{field} contains non-hex characters: {val}"
756            )));
757        }
758
759        Ok(val.to_string())
760    }
761
762    async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
763        // Visibility boundary gate for read queries.
764        // Currently all-transparent. When multi-user collaboration is
765        // introduced, check_read_access will enforce readable scope per
766        // requester. See PIP-001 §8 (writable_targets).
767        let requester = query.requester_id.as_deref().unwrap_or("anonymous");
768        check_read_access(requester, &query.kind)?;
769
770        match query.kind.as_str() {
771            "health" => Ok(json!({ "status": "ok" })),
772            "actor_energy" => {
773                let actor_id = query.actor_id.ok_or_else(|| {
774                    KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
775                })?;
776                let (energy_balance, reserved_energy) =
777                    self.energy_ledger.balance_view(&actor_id).await?;
778                Ok(json!({
779                    "actor_id": actor_id,
780                    "energy_balance": energy_balance,
781                    "reserved_energy": reserved_energy
782                }))
783            }
784            "events" => {
785                let limit = query.limit.unwrap_or(20).clamp(1, 500);
786                let events = self
787                    .event_log
788                    .query(
789                        query.actor_id.as_deref(),
790                        query.before_index,
791                        query.after_index,
792                        limit,
793                    )
794                    .await?;
795                // Pagination metadata: smallest log_index in result set.
796                // Client passes this as `before_index` to fetch the next page.
797                let next_cursor = events.last().map(|e| e.log_index);
798                let has_more = events.len() as i64 == limit;
799                Ok(json!({
800                    "events": events,
801                    "has_more": has_more,
802                    "next_cursor": next_cursor
803                }))
804            }
805            "stats" => {
806                let event_count = self.event_log.count().await?;
807                Ok(json!({ "event_count": event_count }))
808            }
809            "snapshot" => {
810                // Legacy: snapshot is superseded by audit checkpoint.
811                // Return audit checkpoint data for backward compatibility.
812                let event_count = self.event_log.count().await?;
813                self.audit_log
814                    .ensure_checkpoint(event_count as u64)
815                    .await
816                    .map_err(|e| KernelError::Audit(e.to_string()))?;
817                let cp = self
818                    .audit_log
819                    .latest_checkpoint()
820                    .await
821                    .map_err(|e| KernelError::Audit(e.to_string()))?;
822                Ok(json!({
823                    "event_count": event_count,
824                    "snapshot_hash": cp.root_hash,
825                    "generated_at": cp.created_at
826                }))
827            }
828            "paths" => {
829                let paths = self.state_store.paths();
830                Ok(json!({
831                    "root": paths.root.display().to_string(),
832                    "workspace_root": paths.workspace_root.display().to_string(),
833                    "quarantine_root": paths.quarantine_root.display().to_string(),
834                    "db_path": paths.db_path.display().to_string()
835                }))
836            }
837            "audit_checkpoint" => {
838                let event_count = self.event_log.count().await?;
839                self.audit_log
840                    .ensure_checkpoint(event_count as u64)
841                    .await
842                    .map_err(|e| KernelError::Audit(e.to_string()))?;
843                let cp = self
844                    .audit_log
845                    .latest_checkpoint()
846                    .await
847                    .map_err(|e| KernelError::Audit(e.to_string()))?;
848                Ok(serde_json::to_value(cp)?)
849            }
850            "audit_inclusion_proof" => {
851                let log_index = query.log_index.ok_or_else(|| {
852                    KernelError::InvalidRequest(
853                        "log_index is required for audit_inclusion_proof".to_string(),
854                    )
855                })? as u64;
856                let tree_size = match query.tree_size {
857                    Some(s) => s as u64,
858                    None => {
859                        // Ensure checkpoint is current before deriving tree_size.
860                        let event_count = self.event_log.count().await? as u64;
861                        self.audit_log
862                            .ensure_checkpoint(event_count)
863                            .await
864                            .map_err(|e| KernelError::Audit(e.to_string()))?;
865                        self.audit_log
866                            .tree_size()
867                            .await
868                            .map_err(|e| KernelError::Audit(e.to_string()))?
869                    }
870                };
871                let proof = self
872                    .audit_log
873                    .inclusion_proof(log_index, tree_size)
874                    .await
875                    .map_err(|e| KernelError::Audit(e.to_string()))?;
876                Ok(json!({
877                    "log_index": log_index,
878                    "tree_size": tree_size,
879                    "proof": proof
880                }))
881            }
882            "audit_consistency_proof" => {
883                let old_size = query.old_size.ok_or_else(|| {
884                    KernelError::InvalidRequest(
885                        "old_size is required for audit_consistency_proof".to_string(),
886                    )
887                })? as u64;
888                let new_size = query.tree_size.ok_or_else(|| {
889                    KernelError::InvalidRequest(
890                        "tree_size is required for audit_consistency_proof".to_string(),
891                    )
892                })? as u64;
893                let proof = self
894                    .audit_log
895                    .consistency_proof(old_size, new_size)
896                    .await
897                    .map_err(|e| KernelError::Audit(e.to_string()))?;
898                Ok(json!({
899                    "old_size": old_size,
900                    "new_size": new_size,
901                    "proof": proof
902                }))
903            }
904            // Phase 2: stellar configuration read query.
905            "stellar_info" => {
906                let config = &self.stellar_config;
907                Ok(json!({
908                    "gpu_model": config.gpu_model,
909                    "cpu_model": config.cpu_model,
910                    "int8_tops": config.int8_tops,
911                    "energy_per_tick": config.effective_energy_per_tick(),
912                    "tick_interval_ms": config.tick_interval_ms,
913                    "luminosity_source": config.luminosity_source
914                }))
915            }
916            // Phase 4b: envelope_info read query — retrieve active envelope for an actor.
917            "envelope_info" => {
918                let actor_id = query.actor_id.ok_or_else(|| {
919                    KernelError::InvalidRequest(
920                        "actor_id is required for envelope_info".to_string(),
921                    )
922                })?;
923                let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
924                match envelope {
925                    Some(record) => Ok(serde_json::to_value(record)?),
926                    None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
927                }
928            }
929            // Phase 1: actor_info read query — retrieve actor record.
930            "actor_info" => {
931                let actor_id = query.actor_id.ok_or_else(|| {
932                    KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
933                })?;
934                let actor = self.actor_store.get(&actor_id).await?;
935                match actor {
936                    Some(record) => Ok(serde_json::to_value(record)?),
937                    None => Err(KernelError::ActorNotFound(actor_id)),
938                }
939            }
940            // PIP-001 §11: hold_info — retrieve a single hold_request by hold_id.
941            "hold_info" => {
942                let hold_id = query.hold_id.ok_or_else(|| {
943                    KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
944                })?;
945                let hold = self.envelope_store.get_hold_request(&hold_id).await?;
946                match hold {
947                    Some(record) => Ok(record),
948                    None => Err(KernelError::InvalidRequest(format!(
949                        "hold_request not found: {hold_id}"
950                    ))),
951                }
952            }
953            // PIP-001 §11: holds_pending — list pending hold_requests, optionally filtered by agent.
954            "holds_pending" => {
955                let holds = self
956                    .envelope_store
957                    .list_pending_holds(query.actor_id.as_deref())
958                    .await?;
959                Ok(json!({ "holds": holds }))
960            }
961            other => Err(KernelError::InvalidRequest(format!(
962                "unsupported read query kind: {other}"
963            ))),
964        }
965    }
966}
967
968fn now_millis_string() -> String {
969    let now = std::time::SystemTime::now()
970        .duration_since(std::time::UNIX_EPOCH)
971        .unwrap_or_default();
972    now.as_millis().to_string()
973}
974
975fn now_millis_u64() -> u64 {
976    std::time::SystemTime::now()
977        .duration_since(std::time::UNIX_EPOCH)
978        .unwrap_or_default()
979        .as_millis() as u64
980}
981
982/// If the action is a `system/policy` Create, extract the policy version string.
983/// Returns `None` for all other actions.
984fn parse_policy_version(action: &Action) -> Option<String> {
985    if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
986        return None;
987    }
988    action
989        .payload
990        .get("version")
991        .and_then(|v| v.as_str())
992        .map(|s| s.to_string())
993}
994
995/// Phase 1: Parse actor creation spec from a `ledger/actor` Create action.
996///
997/// Backward compatible: old format `{actor_id, energy_balance}` is supported
998/// by defaulting to actor_type=agent, purpose="legacy", writable_targets=[].
999///
1000/// New format adds: actor_type, purpose, writable_targets, energy_share.
1001async fn parse_create_actor_spec(
1002    action: &Action,
1003    actor_store: &ActorStore,
1004) -> KernelResult<Option<CreateActorSpec>> {
1005    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
1006        return Ok(None);
1007    }
1008
1009    let payload = action.payload.as_object().ok_or_else(|| {
1010        KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1011    })?;
1012
1013    // Required: actor_id (or derive from purpose)
1014    let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1015    let purpose = payload
1016        .get("purpose")
1017        .and_then(Value::as_str)
1018        .unwrap_or("legacy");
1019    let energy_balance = payload
1020        .get("energy_balance")
1021        .and_then(Value::as_i64)
1022        .unwrap_or(1000);
1023
1024    // Actor type: default to "agent" for backward compat
1025    let actor_type_str = payload
1026        .get("actor_type")
1027        .and_then(Value::as_str)
1028        .unwrap_or("agent");
1029    let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1030        KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1031    })?;
1032
1033    // Derive ID if not explicitly provided
1034    let actor_id = if let Some(id) = explicit_id {
1035        id.to_string()
1036    } else {
1037        if purpose == "legacy" {
1038            return Err(KernelError::InvalidRequest(
1039                "actor_id or purpose is required to create an actor".to_string(),
1040            ));
1041        }
1042        let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1043        derive_agent_id(&action.actor_id, purpose, seq)
1044    };
1045
1046    // Build lineage from creator (PIP-001 §7) + enforce §5 (only humans create agents)
1047    let creator_record = actor_store.get(&action.actor_id).await?;
1048    let (creator_type, creator_lineage) = match &creator_record {
1049        Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1050        // Fallback for legacy actors not yet in actors table
1051        None => (ActorType::Human, vec![]),
1052    };
1053
1054    // PIP-001 §5: Only Humans can create Agents. Reject Agent as creator.
1055    if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1056        return Err(KernelError::PolicyViolation(
1057            "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1058                .to_string(),
1059        ));
1060    }
1061
1062    let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1063
1064    // Writable targets: parse from payload or default to empty
1065    let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1066        payload.get("writable_targets")
1067    {
1068        serde_json::from_value(targets_val.clone())
1069            .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1070    } else {
1071        vec![]
1072    };
1073
1074    // Phase 3 PIP-001 §11: validate child targets are subset of creator's boundary.
1075    if !writable_targets.is_empty()
1076        && let Some(ref creator) = creator_record
1077    {
1078        validate_child_targets(creator, &writable_targets)?;
1079    }
1080
1081    // Energy share: default 0.0
1082    let energy_share = payload
1083        .get("energy_share")
1084        .and_then(Value::as_f64)
1085        .unwrap_or(0.0);
1086
1087    let reduction_policy = payload
1088        .get("reduction_policy")
1089        .and_then(Value::as_str)
1090        .unwrap_or("none")
1091        .to_string();
1092
1093    Ok(Some(CreateActorSpec {
1094        actor_id,
1095        actor_type,
1096        creator_id: action.actor_id.clone(),
1097        lineage,
1098        purpose: Some(purpose.to_string()),
1099        writable_targets,
1100        energy_balance,
1101        energy_share,
1102        reduction_policy,
1103    }))
1104}
1105
1106/// Phase 4b: Parse envelope creation spec from a `ledger/envelope` Create action.
1107///
1108/// Returns (envelope_id, EnvelopeSpec, parent_envelope_id) if applicable.
1109/// The grantor is always the action's actor_id (the human or parent agent granting
1110/// the envelope).
1111///
1112/// PIP-001 §11: If the grantor is an agent, their active envelope is used as the parent
1113/// and reduction validation is performed.
1114async fn parse_create_envelope_spec(
1115    action: &Action,
1116    envelope_store: &EnvelopeStore,
1117) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1118    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1119        return Ok(None);
1120    }
1121
1122    let payload = action.payload.as_object().ok_or_else(|| {
1123        KernelError::InvalidRequest("envelope payload must be an object".to_string())
1124    })?;
1125
1126    let actor_id = payload
1127        .get("actor_id")
1128        .and_then(Value::as_str)
1129        .ok_or_else(|| {
1130            KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1131        })?
1132        .to_string();
1133
1134    let budget = payload
1135        .get("budget")
1136        .and_then(Value::as_i64)
1137        .ok_or_else(|| {
1138            KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1139        })?;
1140
1141    if budget <= 0 {
1142        return Err(KernelError::InvalidRequest(
1143            "envelope budget must be positive".to_string(),
1144        ));
1145    }
1146
1147    let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1148        serde_json::from_value(targets_val.clone())
1149            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1150    } else {
1151        return Err(KernelError::InvalidRequest(
1152            "targets are required to create an envelope".to_string(),
1153        ));
1154    };
1155
1156    let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1157        serde_json::from_value(actions_val.clone())
1158            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1159    } else {
1160        return Err(KernelError::InvalidRequest(
1161            "actions are required to create an envelope".to_string(),
1162        ));
1163    };
1164
1165    let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1166    let report_every = payload.get("report_every").and_then(Value::as_i64);
1167    let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1168
1169    // PIP-001 §11a: Parse hold_on rules.
1170    let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1171        serde_json::from_value(hold_on_val.clone())
1172            .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1173    } else {
1174        vec![]
1175    };
1176
1177    let spec = EnvelopeSpec {
1178        actor_id,
1179        grantor_id: action.actor_id.clone(),
1180        budget,
1181        targets,
1182        actions,
1183        duration_secs,
1184        report_every,
1185        hold_on,
1186        hold_timeout_secs,
1187    };
1188
1189    // PIP-001 §11: If grantor is an agent with an active envelope, validate reduction.
1190    let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1191        .get_active_for_actor(&action.actor_id)
1192        .await?
1193    {
1194        consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1195        Some(grantor_envelope.envelope_id)
1196    } else {
1197        None
1198    };
1199
1200    let envelope_id = Uuid::new_v4().to_string();
1201
1202    Ok(Some((envelope_id, spec, parent_envelope_id)))
1203}
1204
1205// ---------------------------------------------------------------------------
1206// PIP-001 §11d: Hold response parsing and execution
1207// ---------------------------------------------------------------------------
1208
1209/// Parse a `ledger/hold/<hold_id>` mutate target into (hold_id, decision, instruction).
1210///
1211/// Returns `None` for any other target.
1212fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1213    // Pattern: "ledger/hold/<hold_id>"
1214    let rest = target.strip_prefix("ledger/hold/")?;
1215    // Ensure there is a non-empty hold_id segment and nothing after it.
1216    if rest.is_empty() || rest.contains('/') {
1217        return None;
1218    }
1219    let hold_id = rest.to_string();
1220
1221    let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1222
1223    // Validate decision value.
1224    if decision != "approve" && decision != "reject" {
1225        return None;
1226    }
1227
1228    let instruction = payload
1229        .get("instruction")
1230        .and_then(Value::as_str)
1231        .map(|s| s.to_string());
1232
1233    Some((hold_id, decision, instruction))
1234}
1235
1236impl Kernel {
1237    /// PIP-001 §11d: Execute a hold_response (approve or reject).
1238    ///
1239    /// Validates that:
1240    /// 1. The caller is a Human actor (whitepaper §2: Human sovereignty).
1241    /// 2. The hold_request exists and is still pending.
1242    /// 3. The hold envelope belongs to an Agent actor.
1243    ///
1244    /// On approve: re-executes the pending action and writes a `hold_response` event.
1245    /// On reject: discards the pending action and writes a `hold_response` event.
1246    async fn execute_hold_response(
1247        &self,
1248        human_action: &Action,
1249        hold_id: &str,
1250        decision: &str,
1251        instruction: Option<&str>,
1252    ) -> KernelResult<SubmitReceipt> {
1253        // --- Validate caller is Human ---
1254        let caller = self
1255            .actor_store
1256            .get(&human_action.actor_id)
1257            .await?
1258            .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1259
1260        if caller.actor_type != punkgo_core::actor::ActorType::Human {
1261            return Err(KernelError::PolicyViolation(format!(
1262                "only Human actors can resolve holds; caller {} is {:?}",
1263                human_action.actor_id, caller.actor_type
1264            )));
1265        }
1266
1267        // --- Load hold_request ---
1268        let hold_record = self
1269            .envelope_store
1270            .get_hold_request(hold_id)
1271            .await?
1272            .ok_or_else(|| {
1273                KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1274            })?;
1275
1276        let _envelope_id = hold_record
1277            .get("envelope_id")
1278            .and_then(Value::as_str)
1279            .ok_or_else(|| {
1280                KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1281            })?
1282            .to_string();
1283
1284        let agent_id = hold_record
1285            .get("agent_id")
1286            .and_then(Value::as_str)
1287            .ok_or_else(|| {
1288                KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1289            })?
1290            .to_string();
1291
1292        let pending_payload = hold_record
1293            .get("pending_payload")
1294            .cloned()
1295            .unwrap_or(Value::Null);
1296
1297        let trigger_target = hold_record
1298            .get("trigger_target")
1299            .and_then(Value::as_str)
1300            .unwrap_or("")
1301            .to_string();
1302
1303        let trigger_action = hold_record
1304            .get("trigger_action")
1305            .and_then(Value::as_str)
1306            .unwrap_or("")
1307            .to_string();
1308
1309        let status = hold_record
1310            .get("status")
1311            .and_then(Value::as_str)
1312            .unwrap_or("");
1313        if status != "pending" {
1314            return Err(KernelError::PolicyViolation(format!(
1315                "hold_request {hold_id} is already resolved (status={status})"
1316            )));
1317        }
1318
1319        // PIP-001 §11e: Check if this hold has already timed out.
1320        // If so, run the lazy expiry first — it will auto-reject the hold.
1321        let envelope_id_str = hold_record
1322            .get("envelope_id")
1323            .and_then(Value::as_str)
1324            .unwrap_or("")
1325            .to_string();
1326        if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1327            if let Some(timeout_secs) = env.hold_timeout_secs {
1328                if timeout_secs > 0 {
1329                    let triggered_at: u64 = hold_record
1330                        .get("triggered_at")
1331                        .and_then(|v| v.as_str())
1332                        .and_then(|s| s.parse().ok())
1333                        .unwrap_or(0);
1334                    let now = now_millis_u64();
1335                    if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1336                        // Expire this hold first, then return error.
1337                        self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1338                            .await?;
1339                        return Err(KernelError::PolicyViolation(format!(
1340                            "hold_request {hold_id} has timed out and was auto-rejected"
1341                        )));
1342                    }
1343                }
1344            }
1345        }
1346
1347        // --- Build hold_response event payload (PIP-001 §11d) ---
1348        let response_payload = json!({
1349            "hold_id": hold_id,
1350            "agent_id": &agent_id,
1351            "decision": decision,
1352            "instruction": instruction,
1353            "trigger": {
1354                "target": &trigger_target,
1355                "action_type": &trigger_action
1356            },
1357            "resolved_by": &human_action.actor_id,
1358            "resolved_at": now_millis_string()
1359        });
1360
1361        let response_action = Action {
1362            actor_id: human_action.actor_id.clone(),
1363            action_type: ActionType::Mutate,
1364            target: format!("ledger/hold/{hold_id}"),
1365            payload: response_payload.clone(),
1366            timestamp: None,
1367        };
1368
1369        // --- Atomic transaction: resolve hold_request + settle energy + write event ---
1370        // Envelope stays Active throughout — no status change needed (PIP-001 §11b).
1371        let pool = self.state_store.pool();
1372        let mut tx = pool.begin().await?;
1373
1374        self.envelope_store
1375            .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1376            .await?;
1377
1378        // PIP-001 §11f: On reject, settle commitment cost (20% of reserved).
1379        // settle_in_tx(reserved=full, actual=commitment) releases the rest.
1380        let hold_reserved_cost = pending_payload
1381            .get("reserved_cost")
1382            .and_then(|v| v.as_i64())
1383            .unwrap_or(0);
1384
1385        let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1386            let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1387            self.energy_ledger
1388                .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1389                .await?;
1390            cost
1391        } else {
1392            0
1393        };
1394
1395        let mut response_event = EventRecord {
1396            id: Uuid::new_v4().to_string(),
1397            log_index: 0,
1398            event_hash: String::new(),
1399            actor_id: human_action.actor_id.clone(),
1400            action_type: "hold_response".to_string(),
1401            target: format!("ledger/hold/{hold_id}"),
1402            payload: response_payload.clone(),
1403            payload_hash: payload_hash_hex(&response_action)?,
1404            artifact_hash: None,
1405            reserved_energy: hold_reserved_cost,
1406            settled_energy: commitment_cost,
1407            timestamp: now_millis_string(),
1408        };
1409        self.event_log
1410            .append_in_tx(&mut tx, &mut response_event)
1411            .await?;
1412
1413        // Audit trail — atomic with event (whitepaper §3 invariant 5).
1414        let log_index = response_event.log_index as u64;
1415        self.audit_log
1416            .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1417            .await
1418            .map_err(|e| KernelError::Audit(e.to_string()))?;
1419        // Checkpoint generated lazily on read, not here.
1420        tx.commit().await?;
1421
1422        info!(
1423            hold_id = %hold_id,
1424            decision = %decision,
1425            agent_id = %agent_id,
1426            resolved_by = %human_action.actor_id,
1427            "PIP-001 §11d: hold resolved"
1428        );
1429
1430        // --- Approve: re-execute the pending action (PIP-001 §11d) ---
1431        if decision == "approve" {
1432            // Reconstruct the original action from the pending_payload stored in hold_request.
1433            let orig_target = pending_payload
1434                .get("target")
1435                .and_then(Value::as_str)
1436                .unwrap_or(&trigger_target)
1437                .to_string();
1438            let orig_action_type_str = pending_payload
1439                .get("action_type")
1440                .and_then(Value::as_str)
1441                .unwrap_or(&trigger_action)
1442                .to_string();
1443            let mut orig_payload = pending_payload
1444                .get("payload")
1445                .cloned()
1446                .unwrap_or(Value::Null);
1447
1448            // PIP-001 §11d: Energy was already reserved at hold trigger time.
1449            // `reserved_cost` was stored in pending_payload by the hold trigger.
1450            // Inject `_hold_reserved_cost` so submit_action skips double quote/reserve.
1451
1452            // Mark the payload as hold-approved so the re-execution bypasses the hold check (PIP-001 §11d).
1453            // This sentinel prevents a re-trigger loop.
1454            if let Some(obj) = orig_payload.as_object_mut() {
1455                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1456                obj.insert(
1457                    "_hold_reserved_cost".to_string(),
1458                    Value::Number(hold_reserved_cost.into()),
1459                );
1460                // PIP-001 §11d: If instruction is present, append it to the payload.
1461                if let Some(instr) = instruction {
1462                    obj.insert(
1463                        "_hold_instruction".to_string(),
1464                        Value::String(instr.to_string()),
1465                    );
1466                }
1467            } else {
1468                // If payload is not an object (e.g. null), wrap it.
1469                let mut obj = serde_json::Map::new();
1470                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1471                obj.insert(
1472                    "_hold_reserved_cost".to_string(),
1473                    Value::Number(hold_reserved_cost.into()),
1474                );
1475                if let Some(instr) = instruction {
1476                    obj.insert(
1477                        "_hold_instruction".to_string(),
1478                        Value::String(instr.to_string()),
1479                    );
1480                }
1481                orig_payload = Value::Object(obj);
1482            }
1483
1484            let orig_action_type = match orig_action_type_str.as_str() {
1485                "observe" => ActionType::Observe,
1486                "create" => ActionType::Create,
1487                "mutate" => ActionType::Mutate,
1488                _ => ActionType::Execute,
1489            };
1490
1491            let pending_action = Action {
1492                actor_id: agent_id.clone(),
1493                action_type: orig_action_type,
1494                target: orig_target,
1495                payload: orig_payload,
1496                timestamp: None,
1497            };
1498
1499            info!(
1500                hold_id = %hold_id,
1501                agent_id = %agent_id,
1502                "PIP-001 §11d: re-executing approved pending action"
1503            );
1504
1505            // Re-submit the pending action on behalf of the agent.
1506            // This goes through the full pipeline (quota → reserve → execute → settle → append).
1507            // Use Box::pin to break the async recursion (submit_action → execute_hold_response → submit_action).
1508            return Box::pin(self.submit_action(pending_action)).await;
1509        }
1510
1511        // --- Reject: return a receipt with commitment cost (PIP-001 §11f) ---
1512        Ok(SubmitReceipt {
1513            event_id: response_event.id,
1514            log_index: response_event.log_index,
1515            event_hash: response_event.event_hash,
1516            reserved_cost: hold_reserved_cost,
1517            settled_cost: commitment_cost,
1518            artifact_hash: None,
1519        })
1520    }
1521
1522    /// PIP-001 §11e: Lazy expiry for timed-out holds.
1523    ///
1524    /// Called before hold-trigger checks in submit_action. Scans pending holds
1525    /// for the given envelope and auto-rejects any that have exceeded
1526    /// `hold_timeout_secs`. Same pattern as `is_envelope_expired()` — no
1527    /// background thread, just check-on-access.
1528    async fn expire_timed_out_holds(
1529        &self,
1530        envelope_id: &str,
1531        hold_timeout_secs: i64,
1532    ) -> KernelResult<()> {
1533        let holds = self
1534            .envelope_store
1535            .list_pending_holds_for_envelope(envelope_id)
1536            .await?;
1537        let now = now_millis_u64();
1538
1539        for hold in holds {
1540            let triggered_at: u64 = hold
1541                .get("triggered_at")
1542                .and_then(|v| v.as_str())
1543                .and_then(|s| s.parse().ok())
1544                .unwrap_or(0);
1545
1546            if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1547                continue;
1548            }
1549
1550            // Timed out — treat as reject (PIP-001 §11e/§11f).
1551            let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1552            let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1553            let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1554            let reserved_cost = pending_payload
1555                .get("reserved_cost")
1556                .and_then(|v| v.as_i64())
1557                .unwrap_or(0);
1558
1559            let commitment_cost = if reserved_cost > 0 {
1560                ((reserved_cost as f64) * 0.2).ceil() as i64
1561            } else {
1562                0
1563            };
1564
1565            let timeout_payload = json!({
1566                "hold_id": hold_id,
1567                "agent_id": agent_id,
1568                "decision": "timed_out",
1569                "reserved_cost": reserved_cost,
1570                "commitment_cost": commitment_cost,
1571                "triggered_at": triggered_at.to_string(),
1572                "expired_at": now.to_string()
1573            });
1574            let timeout_action = Action {
1575                actor_id: agent_id.to_string(),
1576                action_type: ActionType::Mutate,
1577                target: format!("ledger/hold/{hold_id}"),
1578                payload: timeout_payload.clone(),
1579                timestamp: None,
1580            };
1581            let mut timeout_event = EventRecord {
1582                id: Uuid::new_v4().to_string(),
1583                log_index: 0,
1584                event_hash: String::new(),
1585                actor_id: agent_id.to_string(),
1586                action_type: "hold_timeout".to_string(),
1587                target: format!("ledger/hold/{hold_id}"),
1588                payload: timeout_payload,
1589                payload_hash: payload_hash_hex(&timeout_action)?,
1590                artifact_hash: None,
1591                reserved_energy: reserved_cost,
1592                settled_energy: commitment_cost,
1593                timestamp: now_millis_string(),
1594            };
1595
1596            let pool = self.state_store.pool();
1597            let mut tx = pool.begin().await?;
1598            self.envelope_store
1599                .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1600                .await?;
1601            if reserved_cost > 0 {
1602                self.energy_ledger
1603                    .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1604                    .await?;
1605            }
1606            self.event_log
1607                .append_in_tx(&mut tx, &mut timeout_event)
1608                .await?;
1609
1610            // Audit trail — atomic with event (whitepaper §3 invariant 5).
1611            let t_log_index = timeout_event.log_index as u64;
1612            self.audit_log
1613                .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1614                .await
1615                .map_err(|e| KernelError::Audit(e.to_string()))?;
1616            // Checkpoint generated lazily on read, not here.
1617            tx.commit().await?;
1618
1619            info!(
1620                hold_id = %hold_id,
1621                agent_id = %agent_id,
1622                commitment_cost = commitment_cost,
1623                "PIP-001 §11e: hold auto-expired (timeout)"
1624            );
1625        }
1626        Ok(())
1627    }
1628}