Skip to main content

punkgo_kernel/runtime/
kernel.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11    ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20    ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21    NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26/// Configuration for bootstrapping the kernel.
27#[derive(Debug, Clone)]
28pub struct KernelConfig {
29    pub state_dir: PathBuf,
30    pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34    fn default() -> Self {
35        let state_dir = std::env::var("PUNKGO_STATE_DIR")
36            .map(PathBuf::from)
37            .unwrap_or_else(|_| default_state_dir());
38        let ipc_endpoint =
39            std::env::var("PUNKGO_IPC_ENDPOINT").unwrap_or_else(|_| default_ipc_endpoint());
40        Self {
41            state_dir,
42            ipc_endpoint,
43        }
44    }
45}
46
47/// Default state directory: ~/.punkgo/state
48///
49/// Falls back to `./state` if home directory cannot be determined.
50/// Default IPC endpoint.
51///
52/// On Windows, uses a file-path named pipe (`\\.\pipe\punkgo-kernel`) because
53/// the `GenericNamespaced` namespace can hit "Access Denied" depending on
54/// Windows security policy. On Unix, uses a simple namespace name.
55fn default_ipc_endpoint() -> String {
56    if cfg!(windows) {
57        r"\\.\pipe\punkgo-kernel".to_string()
58    } else {
59        "punkgo-kernel".to_string()
60    }
61}
62
63fn default_state_dir() -> PathBuf {
64    if let Some(home) = home_dir() {
65        return home.join(".punkgo").join("state");
66    }
67    PathBuf::from("state")
68}
69
70fn home_dir() -> Option<PathBuf> {
71    // Unix / WSL
72    if let Some(home) = std::env::var_os("HOME") {
73        return Some(PathBuf::from(home));
74    }
75    // Windows primary
76    if let Some(profile) = std::env::var_os("USERPROFILE") {
77        return Some(PathBuf::from(profile));
78    }
79    // Windows fallback
80    let drive = std::env::var_os("HOMEDRIVE")?;
81    let path = std::env::var_os("HOMEPATH")?;
82    let mut p = PathBuf::from(drive);
83    p.push(path);
84    Some(p)
85}
86
87/// Cryptographic receipt returned after a successful action submission.
88///
89/// Contains the event ID, log index, event hash, and energy costs.
90/// This is the caller's proof that their action was committed to the
91/// append-only history.
92#[derive(Debug, Clone, Serialize)]
93pub struct SubmitReceipt {
94    pub event_id: String,
95    pub log_index: i64,
96    pub event_hash: String,
97    pub reserved_cost: i64,
98    pub settled_cost: i64,
99    pub artifact_hash: Option<String>,
100}
101
102#[derive(Debug, Deserialize)]
103struct ReadQuery {
104    kind: String,
105    actor_id: Option<String>,
106    /// PIP-001 §11: hold_id for hold_info queries.
107    #[serde(default)]
108    hold_id: Option<String>,
109    limit: Option<i64>,
110    /// For audit_inclusion_proof: the event's log_index to prove.
111    log_index: Option<i64>,
112    /// For audit_inclusion_proof / audit_consistency_proof: the tree size to prove against.
113    tree_size: Option<i64>,
114    /// For audit_consistency_proof: the older tree size.
115    old_size: Option<i64>,
116    /// Pagination: only return events with log_index < this value.
117    /// Used for backward pagination (fetching older events).
118    #[serde(default)]
119    before_index: Option<i64>,
120    /// Pagination: only return events with log_index > this value.
121    /// Used for forward pagination (fetching newer events).
122    #[serde(default)]
123    after_index: Option<i64>,
124    /// Identity of the requester. Used by visibility boundary checks.
125    /// Currently optional — when absent, all reads are permitted (single-user).
126    /// When multi-user collaboration is introduced, this becomes required.
127    #[serde(default)]
128    requester_id: Option<String>,
129}
130
131/// The PunkGo kernel — single-writer append-only event system.
132///
133/// The kernel is the **committer** (whitepaper §2): a structural role that
134/// provides a single linearization point for actions. It is not a judge.
135///
136/// Use [`Kernel::bootstrap`] to initialize from a state directory, then
137/// [`Kernel::handle_request`] to process IPC requests.
138pub struct Kernel {
139    state_store: StateStore,
140    energy_ledger: EnergyLedger,
141    event_log: EventLog,
142    audit_log: AuditLog,
143    actor_store: ActorStore,
144    envelope_store: EnvelopeStore,
145    stellar_config: StellarConfig,
146}
147
148impl Kernel {
149    /// Initialize the kernel from a state directory.
150    ///
151    /// Creates the SQLite database, loads stellar config, and prepares the root actor.
152    pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
153        let state_store = StateStore::bootstrap(&config.state_dir).await?;
154        let energy_ledger = EnergyLedger::new(state_store.pool());
155        let event_log = EventLog::new(state_store.pool());
156        let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
157        let actor_store = ActorStore::new(state_store.pool());
158        let envelope_store = EnvelopeStore::new(state_store.pool());
159
160        // Phase 2: Load stellar configuration (PIP-001 §1).
161        let stellar_config_path = config.state_dir.join("stellar.toml");
162        let stellar_config = load_stellar_config(&stellar_config_path)?;
163        info!(
164            energy_per_tick = stellar_config.effective_energy_per_tick(),
165            int8_tops = stellar_config.int8_tops,
166            tick_interval_ms = stellar_config.tick_interval_ms,
167            "stellar configuration loaded"
168        );
169
170        Ok(Self {
171            state_store,
172            energy_ledger,
173            event_log,
174            audit_log,
175            actor_store,
176            envelope_store,
177            stellar_config,
178        })
179    }
180
181    /// Returns a reference to the stellar configuration for external use
182    /// (e.g., energy producer startup).
183    pub fn stellar_config(&self) -> &StellarConfig {
184        &self.stellar_config
185    }
186
187    /// Returns a clone of the energy ledger for external use (e.g., energy producer).
188    pub fn energy_ledger(&self) -> &EnergyLedger {
189        &self.energy_ledger
190    }
191
192    /// Returns a clone of the actor store for external use (e.g., energy producer).
193    pub fn actor_store(&self) -> &ActorStore {
194        &self.actor_store
195    }
196
197    /// Returns a reference to the envelope store for external use.
198    pub fn envelope_store(&self) -> &EnvelopeStore {
199        &self.envelope_store
200    }
201
202    /// Returns the SQLite pool for external use (e.g., energy producer transaction).
203    pub fn pool(&self) -> sqlx::SqlitePool {
204        self.state_store.pool()
205    }
206
207    /// Handle an incoming IPC request — dispatches to quote, submit, or read.
208    pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
209        let request_id = req.request_id.clone();
210        info!(
211            request_id = %request_id,
212            request_type = ?req.request_type,
213            "received request"
214        );
215        match self.dispatch(req).await {
216            Ok(payload) => {
217                info!(request_id = %request_id, "request completed");
218                ResponseEnvelope::ok(request_id, payload)
219            }
220            Err(err) => {
221                warn!(error = %err, "request failed");
222                ResponseEnvelope::err_structured(request_id, &err)
223            }
224        }
225    }
226
227    async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
228        match req.request_type {
229            RequestType::Quote => {
230                let action: Action = serde_json::from_value(req.payload)?;
231                validate_action(&action)?;
232                let cost = quote_cost(&action);
233                Ok(json!({ "cost": cost }))
234            }
235            RequestType::Submit => {
236                let action: Action = serde_json::from_value(req.payload)?;
237                let receipt = self.submit_action(action).await?;
238                Ok(serde_json::to_value(receipt)?)
239            }
240            RequestType::Read => {
241                let query: ReadQuery = serde_json::from_value(req.payload)?;
242                self.read_query(query).await
243            }
244        }
245    }
246
247    async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
248        // Step 1: VALIDATE — policy + actor existence + frozen check
249        validate_action(&action)?;
250        if !self.state_store.actor_exists(&action.actor_id).await? {
251            return Err(KernelError::ActorNotFound(action.actor_id.clone()));
252        }
253
254        // Phase 1: check frozen status via actors table.
255        // Phase 3: check writable boundary (PIP-001 §8/§9).
256        // Phase 4a: check lineage activity (PIP-001 §7) + lifecycle ops.
257        if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
258            if actor.status == punkgo_core::actor::ActorStatus::Frozen
259                && action.action_type.is_state_changing()
260            {
261                return Err(KernelError::ActorFrozen(format!(
262                    "actor {} is frozen and cannot perform state-changing actions",
263                    action.actor_id
264                )));
265            }
266            // Phase 3: Boundary enforcement — check writable_targets.
267            check_writable_boundary(&actor, &action.target, &action.action_type)?;
268
269            // Phase 4a: Check lineage activity for agents (PIP-001 §7).
270            // Agents need their entire creation chain to be active.
271            if actor.actor_type == punkgo_core::actor::ActorType::Agent
272                && action.action_type.is_state_changing()
273            {
274                lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
275            }
276
277            // Phase 4b: Authorization check (PIP-001 §5/§6/§11).
278            // Agents performing state-changing actions need an active envelope.
279            if action.action_type.is_state_changing() {
280                let envelope = self
281                    .envelope_store
282                    .get_active_for_actor(&action.actor_id)
283                    .await?;
284
285                // Lazy expiry check (PIP-001 §11 checkpoint): if envelope exists but is time-expired, mark it.
286                let envelope = if let Some(env) = envelope {
287                    if consent::is_envelope_expired(&env, now_millis_u64()) {
288                        self.envelope_store
289                            .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
290                            .await?;
291                        None
292                    } else {
293                        Some(env)
294                    }
295                } else {
296                    None
297                };
298
299                let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
300
301                // For ManOnTheLoop, also verify envelope covers this specific action.
302                if auth_mode == AuthorizationMode::ManOnTheLoop
303                    && let Some(ref env) = envelope
304                {
305                    consent::check_envelope_covers(
306                        env,
307                        &action.target,
308                        action.action_type.as_str(),
309                    )?;
310
311                    // PIP-001 §11e: Lazy-expire timed-out holds before checking new triggers.
312                    if !env.hold_on.is_empty() {
313                        if let Some(timeout_secs) = env.hold_timeout_secs {
314                            if timeout_secs > 0 {
315                                self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
316                                    .await?;
317                            }
318                        }
319                    }
320
321                    // PIP-001 §11b: Check hold_on rules — if triggered, create a hold_request
322                    // event and reserve energy before execution.
323                    // Skip the hold check if this action was already approved by a Human
324                    // (re-execution after approve; PIP-001 §11d).
325                    let hold_approved = action
326                        .payload
327                        .get("_hold_approved")
328                        .and_then(|v| v.as_bool())
329                        .unwrap_or(false);
330                    if !hold_approved
331                        && consent::check_hold_trigger(
332                            &env.hold_on,
333                            &action.target,
334                            action.action_type.as_str(),
335                        )
336                    {
337                        let hold_id = Uuid::new_v4().to_string();
338
339                        // PIP-001 §11c: Commit = reserve energy (gasLimit model).
340                        // Reserves energy upfront so agent can't overspend while hold is pending.
341                        let reserved_cost = quote_cost(&action) as i64;
342
343                        let hold_payload = json!({
344                            "hold_id": &hold_id,
345                            "agent_id": &action.actor_id,
346                            "trigger": {
347                                "target": &action.target,
348                                "action_type": action.action_type.as_str()
349                            },
350                            "pending_action": {
351                                "target": &action.target,
352                                "action_type": action.action_type.as_str(),
353                                "payload": &action.payload
354                            },
355                            "reserved_cost": reserved_cost,
356                            "triggered_at": now_millis_string()
357                        });
358
359                        // Write hold_request event into history (PIP-001 §11b, whitepaper §3 inv 6).
360                        let hold_action = Action {
361                            actor_id: action.actor_id.clone(),
362                            action_type: ActionType::Mutate,
363                            target: format!("ledger/hold/{hold_id}"),
364                            payload: hold_payload.clone(),
365                            timestamp: None,
366                        };
367                        let mut hold_event = EventRecord {
368                            id: Uuid::new_v4().to_string(),
369                            log_index: 0,
370                            event_hash: String::new(),
371                            actor_id: action.actor_id.clone(),
372                            action_type: "hold_request".to_string(),
373                            target: format!("ledger/hold/{hold_id}"),
374                            payload: hold_payload.clone(),
375                            payload_hash: payload_hash_hex(&hold_action)?,
376                            artifact_hash: None,
377                            reserved_energy: reserved_cost,
378                            settled_energy: 0,
379                            timestamp: now_millis_string(),
380                        };
381
382                        // Atomic: reserve + hold_request + event in one transaction (PIP-001 §11b/§11c).
383                        let pool = self.state_store.pool();
384                        let mut tx = pool.begin().await?;
385                        if reserved_cost > 0 {
386                            self.energy_ledger
387                                .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
388                                .await?;
389                        }
390                        self.event_log
391                            .append_in_tx(&mut tx, &mut hold_event)
392                            .await?;
393                        self.envelope_store
394                            .create_hold_request_in_tx(
395                                &mut tx,
396                                &NewHoldRequest {
397                                    hold_id: &hold_id,
398                                    envelope_id: &env.envelope_id,
399                                    agent_id: &action.actor_id,
400                                    trigger_target: &action.target,
401                                    trigger_action: action.action_type.as_str(),
402                                    pending_payload: &json!({
403                                        "target": &action.target,
404                                        "action_type": action.action_type.as_str(),
405                                        "payload": &action.payload,
406                                        "reserved_cost": reserved_cost
407                                    }),
408                                },
409                            )
410                            .await?;
411                        // Envelope stays Active — agent can continue submitting (PIP-001 §11b).
412
413                        // Audit trail — atomic with event (whitepaper §3 invariant 5).
414                        let log_index = hold_event.log_index as u64;
415                        self.audit_log
416                            .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
417                            .await
418                            .map_err(|e| KernelError::Audit(e.to_string()))?;
419                        // Checkpoint generated lazily on read, not here.
420                        tx.commit().await?;
421
422                        return Err(KernelError::HoldTriggered {
423                            hold_id,
424                            agent_id: action.actor_id.clone(),
425                        });
426                    }
427                }
428            }
429        }
430
431        // PIP-001 §11d: Check for hold_response (approve/reject) operations.
432        // Pattern: mutate "ledger/hold/<hold_id>" with payload { decision, instruction? }
433        let hold_response = if matches!(action.action_type, ActionType::Mutate) {
434            parse_hold_response(&action.target, &action.payload)
435        } else {
436            None
437        };
438
439        // Validate and execute hold_response before entering the normal pipeline.
440        if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
441            return self
442                .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
443                .await;
444        }
445
446        // Phase 4a: Check for lifecycle operations (freeze/unfreeze/terminate).
447        let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
448            lifecycle::parse_lifecycle_op(&action.target, &action.payload)
449        } else {
450            None
451        };
452
453        // Validate lifecycle authorization before proceeding.
454        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
455            let initiator = self
456                .actor_store
457                .get(&action.actor_id)
458                .await?
459                .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
460            let target_actor = self
461                .actor_store
462                .get(target_actor_id)
463                .await?
464                .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
465            lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
466        }
467
468        let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
469        let create_envelope_spec =
470            parse_create_envelope_spec(&action, &self.envelope_store).await?;
471        let policy_version = parse_policy_version(&action);
472
473        // Step 2: QUOTE + Step 3: RESERVE
474        // PIP-001 §11d: Hold-approved actions already have energy reserved at trigger time.
475        // The sentinel `_hold_reserved_cost` carries the pre-reserved amount so we skip
476        // double-charging. We create a phantom EnergyReservation so settle correctly
477        // reduces reserved_energy without calling reserve() again.
478        let (reserved_cost, reservation) = if let Some(hold_reserved) = action
479            .payload
480            .get("_hold_reserved_cost")
481            .and_then(|v| v.as_i64())
482        {
483            let phantom = if hold_reserved > 0 {
484                Some(EnergyReservation {
485                    actor_id: action.actor_id.clone(),
486                    reserved: hold_reserved,
487                })
488            } else {
489                None
490            };
491            (hold_reserved, phantom)
492        } else {
493            // All actions pay quote_cost (action_cost + append_cost).
494            // For observe: action_cost=0 but append_cost >= 1 (Landauer).
495            let cost = quote_cost(&action) as i64;
496            let res = if cost > 0 {
497                Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
498            } else {
499                None
500            };
501            (cost, res)
502        };
503
504        // Step 4: VALIDATE EXECUTE PAYLOAD (PIP-002 §2 — for Execute actions only)
505        // The kernel does not execute anything. It validates the actor-submitted
506        // result and records it. Actor executes, kernel records.
507        let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
508            Some(Self::validate_execute_payload(&action.payload)?)
509        } else {
510            None
511        };
512
513        // Step 5: SETTLE
514        // For PIP-002: settled cost equals reserved cost (no post-execution IO adjustment).
515        let settled_cost = reserved_cost;
516
517        // Step 6: APPEND
518        let mut event = EventRecord {
519            id: Uuid::new_v4().to_string(),
520            log_index: 0,
521            event_hash: String::new(),
522            actor_id: action.actor_id.clone(),
523            action_type: action.action_type.as_str().to_string(),
524            target: action.target.clone(),
525            payload: action.payload.clone(),
526            payload_hash: payload_hash_hex(&action)?,
527            artifact_hash: artifact_hash.clone(),
528            reserved_energy: reserved_cost,
529            settled_energy: settled_cost,
530            timestamp: now_millis_string(),
531        };
532        self.finalize_energy_and_event(
533            reservation.as_ref(),
534            settled_cost,
535            create_actor_spec.as_ref(),
536            create_envelope_spec.as_ref(),
537            &mut event,
538        )
539        .await?;
540
541        // Step 7: POST-COMMIT — envelope budget consumption + checkpoints + policy + lifecycle
542        if let Some(ref version) = policy_version {
543            if let Err(err) = self.state_store.set_policy_version(version).await {
544                warn!(error = %err, "failed to record policy version after commit");
545            } else {
546                info!(policy_version = %version, event_id = %event.id, "policy version updated");
547            }
548        }
549
550        // Phase 4b: Post-commit envelope budget consumption and checkpoint (PIP-001 §11).
551        if action.action_type.is_state_changing()
552            && settled_cost > 0
553            && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
554            && actor.actor_type == ActorType::Agent
555            && let Ok(Some(envelope)) = self
556                .envelope_store
557                .get_active_for_actor(&action.actor_id)
558                .await
559        {
560            // Check checkpoint BEFORE consuming (so we use pre-consume state).
561            let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
562
563            // Consume budget.
564            let pool = self.state_store.pool();
565            let mut tx = pool.begin().await?;
566            match self
567                .envelope_store
568                .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
569                .await
570            {
571                Ok(_new_consumed) => {
572                    // Handle checkpoint levels (PIP-001 §11).
573                    match checkpoint {
574                        Some(CheckpointLevel::Halt) => {
575                            self.envelope_store
576                                .set_status_in_tx(
577                                    &mut tx,
578                                    &envelope.envelope_id,
579                                    &consent::EnvelopeStatus::Expired,
580                                )
581                                .await?;
582                            info!(
583                                envelope_id = %envelope.envelope_id,
584                                actor_id = %action.actor_id,
585                                "checkpoint: HALT — envelope budget exhausted"
586                            );
587                        }
588                        Some(CheckpointLevel::Report) => {
589                            info!(
590                                envelope_id = %envelope.envelope_id,
591                                actor_id = %action.actor_id,
592                                budget = envelope.budget,
593                                consumed = envelope.budget_consumed + settled_cost,
594                                "checkpoint: REPORT — summary logged"
595                            );
596                        }
597                        None => {}
598                    }
599                    tx.commit().await?;
600                }
601                Err(err) => {
602                    warn!(
603                        error = %err,
604                        "envelope budget consumption failed (event already committed)"
605                    );
606                    // Transaction is automatically rolled back on drop.
607                }
608            }
609        }
610
611        // Phase 4a: Execute lifecycle operation after event is committed.
612        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
613            let pool = self.state_store.pool();
614            match op {
615                punkgo_core::actor::LifecycleOp::Freeze { .. } => {
616                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
617                    {
618                        Ok(frozen_ids) => {
619                            info!(
620                                target = %target_actor_id,
621                                cascade_count = frozen_ids.len(),
622                                "lifecycle: freeze executed"
623                            );
624                        }
625                        Err(err) => {
626                            warn!(error = %err, "lifecycle: freeze failed (event already committed)");
627                        }
628                    }
629                }
630                punkgo_core::actor::LifecycleOp::Unfreeze => {
631                    match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
632                        .await
633                    {
634                        Ok(()) => {
635                            info!(target = %target_actor_id, "lifecycle: unfreeze executed");
636                        }
637                        Err(err) => {
638                            warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
639                        }
640                    }
641                }
642                punkgo_core::actor::LifecycleOp::Terminate { .. } => {
643                    // Terminate sets the actor to frozen permanently.
644                    // Orphan handling is a future enhancement — for now, children are cascade-frozen.
645                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
646                    {
647                        Ok(frozen_ids) => {
648                            info!(
649                                target = %target_actor_id,
650                                cascade_count = frozen_ids.len(),
651                                "lifecycle: terminate executed (cascade frozen)"
652                            );
653                        }
654                        Err(err) => {
655                            warn!(error = %err, "lifecycle: terminate failed (event already committed)");
656                        }
657                    }
658                }
659                punkgo_core::actor::LifecycleOp::UpdateEnergyShare { energy_share } => {
660                    match lifecycle::execute_update_energy_share(
661                        &self.actor_store,
662                        &pool,
663                        target_actor_id,
664                        *energy_share,
665                    )
666                    .await
667                    {
668                        Ok(()) => {
669                            info!(
670                                target = %target_actor_id,
671                                energy_share,
672                                "lifecycle: energy_share updated"
673                            );
674                        }
675                        Err(err) => {
676                            warn!(error = %err, "lifecycle: update_energy_share failed (event already committed)");
677                        }
678                    }
679                }
680            }
681        }
682
683        Ok(SubmitReceipt {
684            event_id: event.id,
685            log_index: event.log_index,
686            event_hash: event.event_hash,
687            reserved_cost,
688            settled_cost,
689            artifact_hash,
690        })
691    }
692
693    async fn finalize_energy_and_event(
694        &self,
695        reservation: Option<&EnergyReservation>,
696        settled_cost: i64,
697        create_actor: Option<&CreateActorSpec>,
698        create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
699        event: &mut EventRecord,
700    ) -> KernelResult<()> {
701        let pool = self.state_store.pool();
702        let mut tx = pool.begin().await?;
703
704        // Energy settlement (if reservation exists).
705        if let Some(res) = reservation {
706            self.energy_ledger
707                .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
708                .await?;
709        }
710
711        // Actor creation (if applicable).
712        if let Some(spec) = create_actor {
713            self.energy_ledger
714                .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
715                .await?;
716            self.actor_store.create_in_tx(&mut tx, spec).await?;
717            info!(
718                created_actor = %spec.actor_id,
719                actor_type = spec.actor_type.as_str(),
720                energy_balance = spec.energy_balance,
721                "actor created in transaction"
722            );
723        }
724
725        // Envelope creation (if applicable, whitepaper §3 invariant 6).
726        if let Some((envelope_id, spec, parent_id)) = create_envelope {
727            self.envelope_store
728                .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
729                .await?;
730            info!(
731                envelope_id = %envelope_id,
732                actor_id = %spec.actor_id,
733                grantor_id = %spec.grantor_id,
734                budget = spec.budget,
735                "envelope created in transaction"
736            );
737        }
738
739        // Event append.
740        self.event_log.append_in_tx(&mut tx, event).await?;
741
742        // Audit trail update — atomic with event (whitepaper §3 invariant 5).
743        let log_index = event.log_index as u64;
744        self.audit_log
745            .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
746            .await
747            .map_err(|e| KernelError::Audit(e.to_string()))?;
748
749        // Checkpoint is NOT generated on every event. It is a derived artifact
750        // (tree root computed from leaf hashes) and can be generated on-demand
751        // when queried (receipt, show, verify) or via explicit checkpoint command.
752        // This keeps the write path fast and lock-free for concurrent access.
753        tx.commit().await?;
754        info!(event_id = %event.id, log_index = event.log_index, "event committed");
755        Ok(())
756    }
757
758    /// PIP-002 §2+§3: Validate execute action payload.
759    ///
760    /// The kernel MUST reject an execute submission that is missing any required
761    /// field or uses an invalid OID format. Returns the artifact_hash on success.
762    fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
763        Self::require_valid_oid(payload, "input_oid")?;
764        Self::require_valid_oid(payload, "output_oid")?;
765
766        if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
767            return Err(KernelError::ExecutePayloadInvalid(
768                "missing or invalid exit_code (must be integer)".to_string(),
769            ));
770        }
771
772        let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
773        Ok(artifact_hash)
774    }
775
776    /// Validate that a payload field exists and matches OID format: `sha256:<64 hex chars>`.
777    fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
778        let val = payload
779            .get(field)
780            .and_then(|v| v.as_str())
781            .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
782
783        if !val.starts_with("sha256:") || val.len() != 71 {
784            return Err(KernelError::ExecutePayloadInvalid(format!(
785                "{field} must be sha256:<64 hex chars>, got: {val}"
786            )));
787        }
788        let hex_part = &val[7..];
789        if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
790            return Err(KernelError::ExecutePayloadInvalid(format!(
791                "{field} contains non-hex characters: {val}"
792            )));
793        }
794
795        Ok(val.to_string())
796    }
797
798    async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
799        // Visibility boundary gate for read queries.
800        // Currently all-transparent. When multi-user collaboration is
801        // introduced, check_read_access will enforce readable scope per
802        // requester. See PIP-001 §8 (writable_targets).
803        let requester = query.requester_id.as_deref().unwrap_or("anonymous");
804        check_read_access(requester, &query.kind)?;
805
806        match query.kind.as_str() {
807            "health" => Ok(json!({ "status": "ok" })),
808            "actor_energy" => {
809                let actor_id = query.actor_id.ok_or_else(|| {
810                    KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
811                })?;
812                let (energy_balance, reserved_energy) =
813                    self.energy_ledger.balance_view(&actor_id).await?;
814                Ok(json!({
815                    "actor_id": actor_id,
816                    "energy_balance": energy_balance,
817                    "reserved_energy": reserved_energy
818                }))
819            }
820            "events" => {
821                let limit = query.limit.unwrap_or(20).clamp(1, 500);
822                let events = self
823                    .event_log
824                    .query(
825                        query.actor_id.as_deref(),
826                        query.before_index,
827                        query.after_index,
828                        limit,
829                    )
830                    .await?;
831                // Pagination metadata: smallest log_index in result set.
832                // Client passes this as `before_index` to fetch the next page.
833                let next_cursor = events.last().map(|e| e.log_index);
834                let has_more = events.len() as i64 == limit;
835                Ok(json!({
836                    "events": events,
837                    "has_more": has_more,
838                    "next_cursor": next_cursor
839                }))
840            }
841            "stats" => {
842                let event_count = self.event_log.count().await?;
843                Ok(json!({ "event_count": event_count }))
844            }
845            "snapshot" => {
846                // Legacy: snapshot is superseded by audit checkpoint.
847                // Return audit checkpoint data for backward compatibility.
848                let event_count = self.event_log.count().await?;
849                self.audit_log
850                    .ensure_checkpoint(event_count as u64)
851                    .await
852                    .map_err(|e| KernelError::Audit(e.to_string()))?;
853                let cp = self
854                    .audit_log
855                    .latest_checkpoint()
856                    .await
857                    .map_err(|e| KernelError::Audit(e.to_string()))?;
858                Ok(json!({
859                    "event_count": event_count,
860                    "snapshot_hash": cp.root_hash,
861                    "generated_at": cp.created_at
862                }))
863            }
864            "paths" => {
865                let paths = self.state_store.paths();
866                Ok(json!({
867                    "root": paths.root.display().to_string(),
868                    "workspace_root": paths.workspace_root.display().to_string(),
869                    "quarantine_root": paths.quarantine_root.display().to_string(),
870                    "db_path": paths.db_path.display().to_string()
871                }))
872            }
873            "audit_checkpoint" => {
874                let event_count = self.event_log.count().await?;
875                self.audit_log
876                    .ensure_checkpoint(event_count as u64)
877                    .await
878                    .map_err(|e| KernelError::Audit(e.to_string()))?;
879                let cp = self
880                    .audit_log
881                    .latest_checkpoint()
882                    .await
883                    .map_err(|e| KernelError::Audit(e.to_string()))?;
884                Ok(serde_json::to_value(cp)?)
885            }
886            "audit_inclusion_proof" => {
887                let log_index = query.log_index.ok_or_else(|| {
888                    KernelError::InvalidRequest(
889                        "log_index is required for audit_inclusion_proof".to_string(),
890                    )
891                })? as u64;
892                let tree_size = match query.tree_size {
893                    Some(s) => s as u64,
894                    None => {
895                        // Ensure checkpoint is current before deriving tree_size.
896                        let event_count = self.event_log.count().await? as u64;
897                        self.audit_log
898                            .ensure_checkpoint(event_count)
899                            .await
900                            .map_err(|e| KernelError::Audit(e.to_string()))?;
901                        self.audit_log
902                            .tree_size()
903                            .await
904                            .map_err(|e| KernelError::Audit(e.to_string()))?
905                    }
906                };
907                let proof = self
908                    .audit_log
909                    .inclusion_proof(log_index, tree_size)
910                    .await
911                    .map_err(|e| KernelError::Audit(e.to_string()))?;
912                Ok(json!({
913                    "log_index": log_index,
914                    "tree_size": tree_size,
915                    "proof": proof
916                }))
917            }
918            "audit_consistency_proof" => {
919                let old_size = query.old_size.ok_or_else(|| {
920                    KernelError::InvalidRequest(
921                        "old_size is required for audit_consistency_proof".to_string(),
922                    )
923                })? as u64;
924                let new_size = query.tree_size.ok_or_else(|| {
925                    KernelError::InvalidRequest(
926                        "tree_size is required for audit_consistency_proof".to_string(),
927                    )
928                })? as u64;
929                let proof = self
930                    .audit_log
931                    .consistency_proof(old_size, new_size)
932                    .await
933                    .map_err(|e| KernelError::Audit(e.to_string()))?;
934                Ok(json!({
935                    "old_size": old_size,
936                    "new_size": new_size,
937                    "proof": proof
938                }))
939            }
940            // Phase 2: stellar configuration read query.
941            "stellar_info" => {
942                let config = &self.stellar_config;
943                Ok(json!({
944                    "gpu_model": config.gpu_model,
945                    "cpu_model": config.cpu_model,
946                    "int8_tops": config.int8_tops,
947                    "energy_per_tick": config.effective_energy_per_tick(),
948                    "tick_interval_ms": config.tick_interval_ms,
949                    "luminosity_source": config.luminosity_source
950                }))
951            }
952            // Phase 4b: envelope_info read query — retrieve active envelope for an actor.
953            "envelope_info" => {
954                let actor_id = query.actor_id.ok_or_else(|| {
955                    KernelError::InvalidRequest(
956                        "actor_id is required for envelope_info".to_string(),
957                    )
958                })?;
959                let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
960                match envelope {
961                    Some(record) => Ok(serde_json::to_value(record)?),
962                    None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
963                }
964            }
965            // Phase 1: actor_info read query — retrieve actor record.
966            "actor_info" => {
967                let actor_id = query.actor_id.ok_or_else(|| {
968                    KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
969                })?;
970                let actor = self.actor_store.get(&actor_id).await?;
971                match actor {
972                    Some(record) => Ok(serde_json::to_value(record)?),
973                    None => Err(KernelError::ActorNotFound(actor_id)),
974                }
975            }
976            // PIP-001 §11: hold_info — retrieve a single hold_request by hold_id.
977            "hold_info" => {
978                let hold_id = query.hold_id.ok_or_else(|| {
979                    KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
980                })?;
981                let hold = self.envelope_store.get_hold_request(&hold_id).await?;
982                match hold {
983                    Some(record) => Ok(record),
984                    None => Err(KernelError::InvalidRequest(format!(
985                        "hold_request not found: {hold_id}"
986                    ))),
987                }
988            }
989            // PIP-001 §11: holds_pending — list pending hold_requests, optionally filtered by agent.
990            "holds_pending" => {
991                let holds = self
992                    .envelope_store
993                    .list_pending_holds(query.actor_id.as_deref())
994                    .await?;
995                Ok(json!({ "holds": holds }))
996            }
997            other => Err(KernelError::InvalidRequest(format!(
998                "unsupported read query kind: {other}"
999            ))),
1000        }
1001    }
1002}
1003
1004fn now_millis_string() -> String {
1005    let now = std::time::SystemTime::now()
1006        .duration_since(std::time::UNIX_EPOCH)
1007        .unwrap_or_default();
1008    now.as_millis().to_string()
1009}
1010
1011fn now_millis_u64() -> u64 {
1012    std::time::SystemTime::now()
1013        .duration_since(std::time::UNIX_EPOCH)
1014        .unwrap_or_default()
1015        .as_millis() as u64
1016}
1017
1018/// If the action is a `system/policy` Create, extract the policy version string.
1019/// Returns `None` for all other actions.
1020fn parse_policy_version(action: &Action) -> Option<String> {
1021    if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
1022        return None;
1023    }
1024    action
1025        .payload
1026        .get("version")
1027        .and_then(|v| v.as_str())
1028        .map(|s| s.to_string())
1029}
1030
1031/// Phase 1: Parse actor creation spec from a `ledger/actor` Create action.
1032///
1033/// Backward compatible: old format `{actor_id, energy_balance}` is supported
1034/// by defaulting to actor_type=agent, purpose="legacy", writable_targets=[].
1035///
1036/// New format adds: actor_type, purpose, writable_targets, energy_share.
1037async fn parse_create_actor_spec(
1038    action: &Action,
1039    actor_store: &ActorStore,
1040) -> KernelResult<Option<CreateActorSpec>> {
1041    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
1042        return Ok(None);
1043    }
1044
1045    let payload = action.payload.as_object().ok_or_else(|| {
1046        KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1047    })?;
1048
1049    // Required: actor_id (or derive from purpose)
1050    let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1051    let purpose = payload
1052        .get("purpose")
1053        .and_then(Value::as_str)
1054        .unwrap_or("legacy");
1055    let energy_balance = payload
1056        .get("energy_balance")
1057        .and_then(Value::as_i64)
1058        .unwrap_or(1000);
1059
1060    // Actor type: default to "agent" for backward compat
1061    let actor_type_str = payload
1062        .get("actor_type")
1063        .and_then(Value::as_str)
1064        .unwrap_or("agent");
1065    let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1066        KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1067    })?;
1068
1069    // Derive ID if not explicitly provided
1070    let actor_id = if let Some(id) = explicit_id {
1071        id.to_string()
1072    } else {
1073        if purpose == "legacy" {
1074            return Err(KernelError::InvalidRequest(
1075                "actor_id or purpose is required to create an actor".to_string(),
1076            ));
1077        }
1078        let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1079        derive_agent_id(&action.actor_id, purpose, seq)
1080    };
1081
1082    // Build lineage from creator (PIP-001 §7) + enforce §5 (only humans create agents)
1083    let creator_record = actor_store.get(&action.actor_id).await?;
1084    let (creator_type, creator_lineage) = match &creator_record {
1085        Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1086        // Fallback for legacy actors not yet in actors table
1087        None => (ActorType::Human, vec![]),
1088    };
1089
1090    // PIP-001 §5: Only Humans can create Agents. Reject Agent as creator.
1091    if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1092        return Err(KernelError::PolicyViolation(
1093            "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1094                .to_string(),
1095        ));
1096    }
1097
1098    let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1099
1100    // Writable targets: parse from payload or default to empty
1101    let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1102        payload.get("writable_targets")
1103    {
1104        serde_json::from_value(targets_val.clone())
1105            .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1106    } else {
1107        vec![]
1108    };
1109
1110    // Phase 3 PIP-001 §11: validate child targets are subset of creator's boundary.
1111    if !writable_targets.is_empty()
1112        && let Some(ref creator) = creator_record
1113    {
1114        validate_child_targets(creator, &writable_targets)?;
1115    }
1116
1117    // Energy share: default 0.0
1118    let energy_share = payload
1119        .get("energy_share")
1120        .and_then(Value::as_f64)
1121        .unwrap_or(0.0);
1122
1123    let reduction_policy = payload
1124        .get("reduction_policy")
1125        .and_then(Value::as_str)
1126        .unwrap_or("none")
1127        .to_string();
1128
1129    Ok(Some(CreateActorSpec {
1130        actor_id,
1131        actor_type,
1132        creator_id: action.actor_id.clone(),
1133        lineage,
1134        purpose: Some(purpose.to_string()),
1135        writable_targets,
1136        energy_balance,
1137        energy_share,
1138        reduction_policy,
1139    }))
1140}
1141
1142/// Phase 4b: Parse envelope creation spec from a `ledger/envelope` Create action.
1143///
1144/// Returns (envelope_id, EnvelopeSpec, parent_envelope_id) if applicable.
1145/// The grantor is always the action's actor_id (the human or parent agent granting
1146/// the envelope).
1147///
1148/// PIP-001 §11: If the grantor is an agent, their active envelope is used as the parent
1149/// and reduction validation is performed.
1150async fn parse_create_envelope_spec(
1151    action: &Action,
1152    envelope_store: &EnvelopeStore,
1153) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1154    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1155        return Ok(None);
1156    }
1157
1158    let payload = action.payload.as_object().ok_or_else(|| {
1159        KernelError::InvalidRequest("envelope payload must be an object".to_string())
1160    })?;
1161
1162    let actor_id = payload
1163        .get("actor_id")
1164        .and_then(Value::as_str)
1165        .ok_or_else(|| {
1166            KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1167        })?
1168        .to_string();
1169
1170    let budget = payload
1171        .get("budget")
1172        .and_then(Value::as_i64)
1173        .ok_or_else(|| {
1174            KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1175        })?;
1176
1177    if budget <= 0 {
1178        return Err(KernelError::InvalidRequest(
1179            "envelope budget must be positive".to_string(),
1180        ));
1181    }
1182
1183    let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1184        serde_json::from_value(targets_val.clone())
1185            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1186    } else {
1187        return Err(KernelError::InvalidRequest(
1188            "targets are required to create an envelope".to_string(),
1189        ));
1190    };
1191
1192    let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1193        serde_json::from_value(actions_val.clone())
1194            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1195    } else {
1196        return Err(KernelError::InvalidRequest(
1197            "actions are required to create an envelope".to_string(),
1198        ));
1199    };
1200
1201    let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1202    let report_every = payload.get("report_every").and_then(Value::as_i64);
1203    let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1204
1205    // PIP-001 §11a: Parse hold_on rules.
1206    let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1207        serde_json::from_value(hold_on_val.clone())
1208            .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1209    } else {
1210        vec![]
1211    };
1212
1213    let spec = EnvelopeSpec {
1214        actor_id,
1215        grantor_id: action.actor_id.clone(),
1216        budget,
1217        targets,
1218        actions,
1219        duration_secs,
1220        report_every,
1221        hold_on,
1222        hold_timeout_secs,
1223    };
1224
1225    // PIP-001 §11: If grantor is an agent with an active envelope, validate reduction.
1226    let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1227        .get_active_for_actor(&action.actor_id)
1228        .await?
1229    {
1230        consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1231        Some(grantor_envelope.envelope_id)
1232    } else {
1233        None
1234    };
1235
1236    let envelope_id = Uuid::new_v4().to_string();
1237
1238    Ok(Some((envelope_id, spec, parent_envelope_id)))
1239}
1240
1241// ---------------------------------------------------------------------------
1242// PIP-001 §11d: Hold response parsing and execution
1243// ---------------------------------------------------------------------------
1244
1245/// Parse a `ledger/hold/<hold_id>` mutate target into (hold_id, decision, instruction).
1246///
1247/// Returns `None` for any other target.
1248fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1249    // Pattern: "ledger/hold/<hold_id>"
1250    let rest = target.strip_prefix("ledger/hold/")?;
1251    // Ensure there is a non-empty hold_id segment and nothing after it.
1252    if rest.is_empty() || rest.contains('/') {
1253        return None;
1254    }
1255    let hold_id = rest.to_string();
1256
1257    let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1258
1259    // Validate decision value.
1260    if decision != "approve" && decision != "reject" {
1261        return None;
1262    }
1263
1264    let instruction = payload
1265        .get("instruction")
1266        .and_then(Value::as_str)
1267        .map(|s| s.to_string());
1268
1269    Some((hold_id, decision, instruction))
1270}
1271
1272impl Kernel {
1273    /// PIP-001 §11d: Execute a hold_response (approve or reject).
1274    ///
1275    /// Validates that:
1276    /// 1. The caller is a Human actor (whitepaper §2: Human sovereignty).
1277    /// 2. The hold_request exists and is still pending.
1278    /// 3. The hold envelope belongs to an Agent actor.
1279    ///
1280    /// On approve: re-executes the pending action and writes a `hold_response` event.
1281    /// On reject: discards the pending action and writes a `hold_response` event.
1282    async fn execute_hold_response(
1283        &self,
1284        human_action: &Action,
1285        hold_id: &str,
1286        decision: &str,
1287        instruction: Option<&str>,
1288    ) -> KernelResult<SubmitReceipt> {
1289        // --- Validate caller is Human ---
1290        let caller = self
1291            .actor_store
1292            .get(&human_action.actor_id)
1293            .await?
1294            .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1295
1296        if caller.actor_type != punkgo_core::actor::ActorType::Human {
1297            return Err(KernelError::PolicyViolation(format!(
1298                "only Human actors can resolve holds; caller {} is {:?}",
1299                human_action.actor_id, caller.actor_type
1300            )));
1301        }
1302
1303        // --- Load hold_request ---
1304        let hold_record = self
1305            .envelope_store
1306            .get_hold_request(hold_id)
1307            .await?
1308            .ok_or_else(|| {
1309                KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1310            })?;
1311
1312        let _envelope_id = hold_record
1313            .get("envelope_id")
1314            .and_then(Value::as_str)
1315            .ok_or_else(|| {
1316                KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1317            })?
1318            .to_string();
1319
1320        let agent_id = hold_record
1321            .get("agent_id")
1322            .and_then(Value::as_str)
1323            .ok_or_else(|| {
1324                KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1325            })?
1326            .to_string();
1327
1328        let pending_payload = hold_record
1329            .get("pending_payload")
1330            .cloned()
1331            .unwrap_or(Value::Null);
1332
1333        let trigger_target = hold_record
1334            .get("trigger_target")
1335            .and_then(Value::as_str)
1336            .unwrap_or("")
1337            .to_string();
1338
1339        let trigger_action = hold_record
1340            .get("trigger_action")
1341            .and_then(Value::as_str)
1342            .unwrap_or("")
1343            .to_string();
1344
1345        let status = hold_record
1346            .get("status")
1347            .and_then(Value::as_str)
1348            .unwrap_or("");
1349        if status != "pending" {
1350            return Err(KernelError::PolicyViolation(format!(
1351                "hold_request {hold_id} is already resolved (status={status})"
1352            )));
1353        }
1354
1355        // PIP-001 §11e: Check if this hold has already timed out.
1356        // If so, run the lazy expiry first — it will auto-reject the hold.
1357        let envelope_id_str = hold_record
1358            .get("envelope_id")
1359            .and_then(Value::as_str)
1360            .unwrap_or("")
1361            .to_string();
1362        if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1363            if let Some(timeout_secs) = env.hold_timeout_secs {
1364                if timeout_secs > 0 {
1365                    let triggered_at: u64 = hold_record
1366                        .get("triggered_at")
1367                        .and_then(|v| v.as_str())
1368                        .and_then(|s| s.parse().ok())
1369                        .unwrap_or(0);
1370                    let now = now_millis_u64();
1371                    if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1372                        // Expire this hold first, then return error.
1373                        self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1374                            .await?;
1375                        return Err(KernelError::PolicyViolation(format!(
1376                            "hold_request {hold_id} has timed out and was auto-rejected"
1377                        )));
1378                    }
1379                }
1380            }
1381        }
1382
1383        // --- Build hold_response event payload (PIP-001 §11d) ---
1384        let response_payload = json!({
1385            "hold_id": hold_id,
1386            "agent_id": &agent_id,
1387            "decision": decision,
1388            "instruction": instruction,
1389            "trigger": {
1390                "target": &trigger_target,
1391                "action_type": &trigger_action
1392            },
1393            "resolved_by": &human_action.actor_id,
1394            "resolved_at": now_millis_string()
1395        });
1396
1397        let response_action = Action {
1398            actor_id: human_action.actor_id.clone(),
1399            action_type: ActionType::Mutate,
1400            target: format!("ledger/hold/{hold_id}"),
1401            payload: response_payload.clone(),
1402            timestamp: None,
1403        };
1404
1405        // --- Atomic transaction: resolve hold_request + settle energy + write event ---
1406        // Envelope stays Active throughout — no status change needed (PIP-001 §11b).
1407        let pool = self.state_store.pool();
1408        let mut tx = pool.begin().await?;
1409
1410        self.envelope_store
1411            .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1412            .await?;
1413
1414        // PIP-001 §11f: On reject, settle commitment cost (20% of reserved).
1415        // settle_in_tx(reserved=full, actual=commitment) releases the rest.
1416        let hold_reserved_cost = pending_payload
1417            .get("reserved_cost")
1418            .and_then(|v| v.as_i64())
1419            .unwrap_or(0);
1420
1421        let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1422            let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1423            self.energy_ledger
1424                .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1425                .await?;
1426            cost
1427        } else {
1428            0
1429        };
1430
1431        let mut response_event = EventRecord {
1432            id: Uuid::new_v4().to_string(),
1433            log_index: 0,
1434            event_hash: String::new(),
1435            actor_id: human_action.actor_id.clone(),
1436            action_type: "hold_response".to_string(),
1437            target: format!("ledger/hold/{hold_id}"),
1438            payload: response_payload.clone(),
1439            payload_hash: payload_hash_hex(&response_action)?,
1440            artifact_hash: None,
1441            reserved_energy: hold_reserved_cost,
1442            settled_energy: commitment_cost,
1443            timestamp: now_millis_string(),
1444        };
1445        self.event_log
1446            .append_in_tx(&mut tx, &mut response_event)
1447            .await?;
1448
1449        // Audit trail — atomic with event (whitepaper §3 invariant 5).
1450        let log_index = response_event.log_index as u64;
1451        self.audit_log
1452            .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1453            .await
1454            .map_err(|e| KernelError::Audit(e.to_string()))?;
1455        // Checkpoint generated lazily on read, not here.
1456        tx.commit().await?;
1457
1458        info!(
1459            hold_id = %hold_id,
1460            decision = %decision,
1461            agent_id = %agent_id,
1462            resolved_by = %human_action.actor_id,
1463            "PIP-001 §11d: hold resolved"
1464        );
1465
1466        // --- Approve: re-execute the pending action (PIP-001 §11d) ---
1467        if decision == "approve" {
1468            // Reconstruct the original action from the pending_payload stored in hold_request.
1469            let orig_target = pending_payload
1470                .get("target")
1471                .and_then(Value::as_str)
1472                .unwrap_or(&trigger_target)
1473                .to_string();
1474            let orig_action_type_str = pending_payload
1475                .get("action_type")
1476                .and_then(Value::as_str)
1477                .unwrap_or(&trigger_action)
1478                .to_string();
1479            let mut orig_payload = pending_payload
1480                .get("payload")
1481                .cloned()
1482                .unwrap_or(Value::Null);
1483
1484            // PIP-001 §11d: Energy was already reserved at hold trigger time.
1485            // `reserved_cost` was stored in pending_payload by the hold trigger.
1486            // Inject `_hold_reserved_cost` so submit_action skips double quote/reserve.
1487
1488            // Mark the payload as hold-approved so the re-execution bypasses the hold check (PIP-001 §11d).
1489            // This sentinel prevents a re-trigger loop.
1490            if let Some(obj) = orig_payload.as_object_mut() {
1491                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1492                obj.insert(
1493                    "_hold_reserved_cost".to_string(),
1494                    Value::Number(hold_reserved_cost.into()),
1495                );
1496                // PIP-001 §11d: If instruction is present, append it to the payload.
1497                if let Some(instr) = instruction {
1498                    obj.insert(
1499                        "_hold_instruction".to_string(),
1500                        Value::String(instr.to_string()),
1501                    );
1502                }
1503            } else {
1504                // If payload is not an object (e.g. null), wrap it.
1505                let mut obj = serde_json::Map::new();
1506                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1507                obj.insert(
1508                    "_hold_reserved_cost".to_string(),
1509                    Value::Number(hold_reserved_cost.into()),
1510                );
1511                if let Some(instr) = instruction {
1512                    obj.insert(
1513                        "_hold_instruction".to_string(),
1514                        Value::String(instr.to_string()),
1515                    );
1516                }
1517                orig_payload = Value::Object(obj);
1518            }
1519
1520            let orig_action_type = match orig_action_type_str.as_str() {
1521                "observe" => ActionType::Observe,
1522                "create" => ActionType::Create,
1523                "mutate" => ActionType::Mutate,
1524                _ => ActionType::Execute,
1525            };
1526
1527            let pending_action = Action {
1528                actor_id: agent_id.clone(),
1529                action_type: orig_action_type,
1530                target: orig_target,
1531                payload: orig_payload,
1532                timestamp: None,
1533            };
1534
1535            info!(
1536                hold_id = %hold_id,
1537                agent_id = %agent_id,
1538                "PIP-001 §11d: re-executing approved pending action"
1539            );
1540
1541            // Re-submit the pending action on behalf of the agent.
1542            // This goes through the full pipeline (quota → reserve → execute → settle → append).
1543            // Use Box::pin to break the async recursion (submit_action → execute_hold_response → submit_action).
1544            return Box::pin(self.submit_action(pending_action)).await;
1545        }
1546
1547        // --- Reject: return a receipt with commitment cost (PIP-001 §11f) ---
1548        Ok(SubmitReceipt {
1549            event_id: response_event.id,
1550            log_index: response_event.log_index,
1551            event_hash: response_event.event_hash,
1552            reserved_cost: hold_reserved_cost,
1553            settled_cost: commitment_cost,
1554            artifact_hash: None,
1555        })
1556    }
1557
1558    /// PIP-001 §11e: Lazy expiry for timed-out holds.
1559    ///
1560    /// Called before hold-trigger checks in submit_action. Scans pending holds
1561    /// for the given envelope and auto-rejects any that have exceeded
1562    /// `hold_timeout_secs`. Same pattern as `is_envelope_expired()` — no
1563    /// background thread, just check-on-access.
1564    async fn expire_timed_out_holds(
1565        &self,
1566        envelope_id: &str,
1567        hold_timeout_secs: i64,
1568    ) -> KernelResult<()> {
1569        let holds = self
1570            .envelope_store
1571            .list_pending_holds_for_envelope(envelope_id)
1572            .await?;
1573        let now = now_millis_u64();
1574
1575        for hold in holds {
1576            let triggered_at: u64 = hold
1577                .get("triggered_at")
1578                .and_then(|v| v.as_str())
1579                .and_then(|s| s.parse().ok())
1580                .unwrap_or(0);
1581
1582            if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1583                continue;
1584            }
1585
1586            // Timed out — treat as reject (PIP-001 §11e/§11f).
1587            let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1588            let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1589            let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1590            let reserved_cost = pending_payload
1591                .get("reserved_cost")
1592                .and_then(|v| v.as_i64())
1593                .unwrap_or(0);
1594
1595            let commitment_cost = if reserved_cost > 0 {
1596                ((reserved_cost as f64) * 0.2).ceil() as i64
1597            } else {
1598                0
1599            };
1600
1601            let timeout_payload = json!({
1602                "hold_id": hold_id,
1603                "agent_id": agent_id,
1604                "decision": "timed_out",
1605                "reserved_cost": reserved_cost,
1606                "commitment_cost": commitment_cost,
1607                "triggered_at": triggered_at.to_string(),
1608                "expired_at": now.to_string()
1609            });
1610            let timeout_action = Action {
1611                actor_id: agent_id.to_string(),
1612                action_type: ActionType::Mutate,
1613                target: format!("ledger/hold/{hold_id}"),
1614                payload: timeout_payload.clone(),
1615                timestamp: None,
1616            };
1617            let mut timeout_event = EventRecord {
1618                id: Uuid::new_v4().to_string(),
1619                log_index: 0,
1620                event_hash: String::new(),
1621                actor_id: agent_id.to_string(),
1622                action_type: "hold_timeout".to_string(),
1623                target: format!("ledger/hold/{hold_id}"),
1624                payload: timeout_payload,
1625                payload_hash: payload_hash_hex(&timeout_action)?,
1626                artifact_hash: None,
1627                reserved_energy: reserved_cost,
1628                settled_energy: commitment_cost,
1629                timestamp: now_millis_string(),
1630            };
1631
1632            let pool = self.state_store.pool();
1633            let mut tx = pool.begin().await?;
1634            self.envelope_store
1635                .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1636                .await?;
1637            if reserved_cost > 0 {
1638                self.energy_ledger
1639                    .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1640                    .await?;
1641            }
1642            self.event_log
1643                .append_in_tx(&mut tx, &mut timeout_event)
1644                .await?;
1645
1646            // Audit trail — atomic with event (whitepaper §3 invariant 5).
1647            let t_log_index = timeout_event.log_index as u64;
1648            self.audit_log
1649                .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1650                .await
1651                .map_err(|e| KernelError::Audit(e.to_string()))?;
1652            // Checkpoint generated lazily on read, not here.
1653            tx.commit().await?;
1654
1655            info!(
1656                hold_id = %hold_id,
1657                agent_id = %agent_id,
1658                commitment_cost = commitment_cost,
1659                "PIP-001 §11e: hold auto-expired (timeout)"
1660            );
1661        }
1662        Ok(())
1663    }
1664}