Skip to main content

punkgo_kernel/runtime/
kernel.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11    ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20    ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21    NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26/// Configuration for bootstrapping the kernel.
27#[derive(Debug, Clone)]
28pub struct KernelConfig {
29    pub state_dir: PathBuf,
30    pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34    fn default() -> Self {
35        let state_dir = std::env::var("PUNKGO_STATE_DIR")
36            .map(PathBuf::from)
37            .unwrap_or_else(|_| default_state_dir());
38        let ipc_endpoint =
39            std::env::var("PUNKGO_IPC_ENDPOINT").unwrap_or_else(|_| default_ipc_endpoint());
40        Self {
41            state_dir,
42            ipc_endpoint,
43        }
44    }
45}
46
47/// Default state directory: ~/.punkgo/state
48///
49/// Falls back to `./state` if home directory cannot be determined.
50/// Default IPC endpoint — unique per PID.
51///
52/// Each daemon instance gets its own socket/pipe keyed by PID, enabling
53/// clean recovery after crashes (stale sockets are cleaned up by new daemons).
54///
55/// - Windows: `\\.\pipe\punkgo-kernel-{pid}` (named pipe)
56/// - Unix: `{state_dir}/daemon-{pid}.sock` (Unix domain socket)
57fn default_ipc_endpoint() -> String {
58    let pid = std::process::id();
59    if cfg!(windows) {
60        format!(r"\\.\pipe\punkgo-kernel-{pid}")
61    } else {
62        let state_dir = std::env::var("PUNKGO_STATE_DIR")
63            .map(std::path::PathBuf::from)
64            .unwrap_or_else(|_| default_state_dir());
65        state_dir
66            .join(format!("daemon-{pid}.sock"))
67            .to_string_lossy()
68            .into_owned()
69    }
70}
71
72fn default_state_dir() -> PathBuf {
73    if let Some(home) = home_dir() {
74        return home.join(".punkgo").join("state");
75    }
76    PathBuf::from("state")
77}
78
79fn home_dir() -> Option<PathBuf> {
80    // Unix / WSL
81    if let Some(home) = std::env::var_os("HOME") {
82        return Some(PathBuf::from(home));
83    }
84    // Windows primary
85    if let Some(profile) = std::env::var_os("USERPROFILE") {
86        return Some(PathBuf::from(profile));
87    }
88    // Windows fallback
89    let drive = std::env::var_os("HOMEDRIVE")?;
90    let path = std::env::var_os("HOMEPATH")?;
91    let mut p = PathBuf::from(drive);
92    p.push(path);
93    Some(p)
94}
95
96/// Cryptographic receipt returned after a successful action submission.
97///
98/// Contains the event ID, log index, event hash, and energy costs.
99/// This is the caller's proof that their action was committed to the
100/// append-only history.
101#[derive(Debug, Clone, Serialize)]
102pub struct SubmitReceipt {
103    pub event_id: String,
104    pub log_index: i64,
105    pub event_hash: String,
106    pub reserved_cost: i64,
107    pub settled_cost: i64,
108    pub artifact_hash: Option<String>,
109}
110
111#[derive(Debug, Deserialize)]
112struct ReadQuery {
113    kind: String,
114    actor_id: Option<String>,
115    /// PIP-001 §11: hold_id for hold_info queries.
116    #[serde(default)]
117    hold_id: Option<String>,
118    limit: Option<i64>,
119    /// For audit_inclusion_proof: the event's log_index to prove.
120    log_index: Option<i64>,
121    /// For audit_inclusion_proof / audit_consistency_proof: the tree size to prove against.
122    tree_size: Option<i64>,
123    /// For audit_consistency_proof: the older tree size.
124    old_size: Option<i64>,
125    /// Pagination: only return events with log_index < this value.
126    /// Used for backward pagination (fetching older events).
127    #[serde(default)]
128    before_index: Option<i64>,
129    /// Pagination: only return events with log_index > this value.
130    /// Used for forward pagination (fetching newer events).
131    #[serde(default)]
132    after_index: Option<i64>,
133    /// Identity of the requester. Used by visibility boundary checks.
134    /// Currently optional — when absent, all reads are permitted (single-user).
135    /// When multi-user collaboration is introduced, this becomes required.
136    #[serde(default)]
137    requester_id: Option<String>,
138}
139
140/// The PunkGo kernel — single-writer append-only event system.
141///
142/// The kernel is the **committer** (whitepaper §2): a structural role that
143/// provides a single linearization point for actions. It is not a judge.
144///
145/// Use [`Kernel::bootstrap`] to initialize from a state directory, then
146/// [`Kernel::handle_request`] to process IPC requests.
147pub struct Kernel {
148    state_store: StateStore,
149    energy_ledger: EnergyLedger,
150    event_log: EventLog,
151    audit_log: AuditLog,
152    actor_store: ActorStore,
153    envelope_store: EnvelopeStore,
154    stellar_config: StellarConfig,
155}
156
157impl Kernel {
158    /// Initialize the kernel from a state directory.
159    ///
160    /// Creates the SQLite database, loads stellar config, and prepares the root actor.
161    pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
162        let state_store = StateStore::bootstrap(&config.state_dir).await?;
163        let energy_ledger = EnergyLedger::new(state_store.pool());
164        let event_log = EventLog::new(state_store.pool());
165        let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
166        let actor_store = ActorStore::new(state_store.pool());
167        let envelope_store = EnvelopeStore::new(state_store.pool());
168
169        // Phase 2: Load stellar configuration (PIP-001 §1).
170        let stellar_config_path = config.state_dir.join("stellar.toml");
171        let stellar_config = load_stellar_config(&stellar_config_path)?;
172        info!(
173            energy_per_tick = stellar_config.effective_energy_per_tick(),
174            int8_tops = stellar_config.int8_tops,
175            tick_interval_ms = stellar_config.tick_interval_ms,
176            "stellar configuration loaded"
177        );
178
179        Ok(Self {
180            state_store,
181            energy_ledger,
182            event_log,
183            audit_log,
184            actor_store,
185            envelope_store,
186            stellar_config,
187        })
188    }
189
190    /// Returns a reference to the stellar configuration for external use
191    /// (e.g., energy producer startup).
192    pub fn stellar_config(&self) -> &StellarConfig {
193        &self.stellar_config
194    }
195
196    /// Returns a clone of the energy ledger for external use (e.g., energy producer).
197    pub fn energy_ledger(&self) -> &EnergyLedger {
198        &self.energy_ledger
199    }
200
201    /// Returns a clone of the actor store for external use (e.g., energy producer).
202    pub fn actor_store(&self) -> &ActorStore {
203        &self.actor_store
204    }
205
206    /// Returns a reference to the envelope store for external use.
207    pub fn envelope_store(&self) -> &EnvelopeStore {
208        &self.envelope_store
209    }
210
211    /// Returns the SQLite pool for external use (e.g., energy producer transaction).
212    pub fn pool(&self) -> sqlx::SqlitePool {
213        self.state_store.pool()
214    }
215
216    /// Handle an incoming IPC request — dispatches to quote, submit, or read.
217    pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
218        let request_id = req.request_id.clone();
219        info!(
220            request_id = %request_id,
221            request_type = ?req.request_type,
222            "received request"
223        );
224        match self.dispatch(req).await {
225            Ok(payload) => {
226                info!(request_id = %request_id, "request completed");
227                ResponseEnvelope::ok(request_id, payload)
228            }
229            Err(err) => {
230                warn!(error = %err, "request failed");
231                ResponseEnvelope::err_structured(request_id, &err)
232            }
233        }
234    }
235
236    async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
237        match req.request_type {
238            RequestType::Quote => {
239                let action: Action = serde_json::from_value(req.payload)?;
240                validate_action(&action)?;
241                let cost = quote_cost(&action);
242                Ok(json!({ "cost": cost }))
243            }
244            RequestType::Submit => {
245                let action: Action = serde_json::from_value(req.payload)?;
246                let receipt = self.submit_action(action).await?;
247                Ok(serde_json::to_value(receipt)?)
248            }
249            RequestType::Read => {
250                let query: ReadQuery = serde_json::from_value(req.payload)?;
251                self.read_query(query).await
252            }
253        }
254    }
255
256    async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
257        // Step 1: VALIDATE — policy + actor existence + frozen check
258        validate_action(&action)?;
259        if !self.state_store.actor_exists(&action.actor_id).await? {
260            return Err(KernelError::ActorNotFound(action.actor_id.clone()));
261        }
262
263        // Phase 1: check frozen status via actors table.
264        // Phase 3: check writable boundary (PIP-001 §8/§9).
265        // Phase 4a: check lineage activity (PIP-001 §7) + lifecycle ops.
266        if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
267            if actor.status == punkgo_core::actor::ActorStatus::Frozen
268                && action.action_type.is_state_changing()
269            {
270                return Err(KernelError::ActorFrozen(format!(
271                    "actor {} is frozen and cannot perform state-changing actions",
272                    action.actor_id
273                )));
274            }
275            // Phase 3: Boundary enforcement — check writable_targets.
276            check_writable_boundary(&actor, &action.target, &action.action_type)?;
277
278            // Phase 4a: Check lineage activity for agents (PIP-001 §7).
279            // Agents need their entire creation chain to be active.
280            if actor.actor_type == punkgo_core::actor::ActorType::Agent
281                && action.action_type.is_state_changing()
282            {
283                lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
284            }
285
286            // Phase 4b: Authorization check (PIP-001 §5/§6/§11).
287            // Agents performing state-changing actions need an active envelope.
288            if action.action_type.is_state_changing() {
289                let envelope = self
290                    .envelope_store
291                    .get_active_for_actor(&action.actor_id)
292                    .await?;
293
294                // Lazy expiry check (PIP-001 §11 checkpoint): if envelope exists but is time-expired, mark it.
295                let envelope = if let Some(env) = envelope {
296                    if consent::is_envelope_expired(&env, now_millis_u64()) {
297                        self.envelope_store
298                            .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
299                            .await?;
300                        None
301                    } else {
302                        Some(env)
303                    }
304                } else {
305                    None
306                };
307
308                let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
309
310                // For ManOnTheLoop, also verify envelope covers this specific action.
311                if auth_mode == AuthorizationMode::ManOnTheLoop
312                    && let Some(ref env) = envelope
313                {
314                    consent::check_envelope_covers(
315                        env,
316                        &action.target,
317                        action.action_type.as_str(),
318                    )?;
319
320                    // PIP-001 §11e: Lazy-expire timed-out holds before checking new triggers.
321                    if !env.hold_on.is_empty() {
322                        if let Some(timeout_secs) = env.hold_timeout_secs {
323                            if timeout_secs > 0 {
324                                self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
325                                    .await?;
326                            }
327                        }
328                    }
329
330                    // PIP-001 §11b: Check hold_on rules — if triggered, create a hold_request
331                    // event and reserve energy before execution.
332                    // Skip the hold check if this action was already approved by a Human
333                    // (re-execution after approve; PIP-001 §11d).
334                    let hold_approved = action
335                        .payload
336                        .get("_hold_approved")
337                        .and_then(|v| v.as_bool())
338                        .unwrap_or(false);
339                    if !hold_approved
340                        && consent::check_hold_trigger(
341                            &env.hold_on,
342                            &action.target,
343                            action.action_type.as_str(),
344                        )
345                    {
346                        let hold_id = Uuid::new_v4().to_string();
347
348                        // PIP-001 §11c: Commit = reserve energy (gasLimit model).
349                        // Reserves energy upfront so agent can't overspend while hold is pending.
350                        let reserved_cost = quote_cost(&action) as i64;
351
352                        let hold_payload = json!({
353                            "hold_id": &hold_id,
354                            "agent_id": &action.actor_id,
355                            "trigger": {
356                                "target": &action.target,
357                                "action_type": action.action_type.as_str()
358                            },
359                            "pending_action": {
360                                "target": &action.target,
361                                "action_type": action.action_type.as_str(),
362                                "payload": &action.payload
363                            },
364                            "reserved_cost": reserved_cost,
365                            "triggered_at": now_millis_string()
366                        });
367
368                        // Write hold_request event into history (PIP-001 §11b, whitepaper §3 inv 6).
369                        let hold_action = Action {
370                            actor_id: action.actor_id.clone(),
371                            action_type: ActionType::Mutate,
372                            target: format!("ledger/hold/{hold_id}"),
373                            payload: hold_payload.clone(),
374                            timestamp: None,
375                        };
376                        let mut hold_event = EventRecord {
377                            id: Uuid::new_v4().to_string(),
378                            log_index: 0,
379                            event_hash: String::new(),
380                            actor_id: action.actor_id.clone(),
381                            action_type: "hold_request".to_string(),
382                            target: format!("ledger/hold/{hold_id}"),
383                            payload: hold_payload.clone(),
384                            payload_hash: payload_hash_hex(&hold_action)?,
385                            artifact_hash: None,
386                            reserved_energy: reserved_cost,
387                            settled_energy: 0,
388                            timestamp: now_millis_string(),
389                        };
390
391                        // Atomic: reserve + hold_request + event in one transaction (PIP-001 §11b/§11c).
392                        let pool = self.state_store.pool();
393                        let mut tx = pool.begin().await?;
394                        if reserved_cost > 0 {
395                            self.energy_ledger
396                                .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
397                                .await?;
398                        }
399                        self.event_log
400                            .append_in_tx(&mut tx, &mut hold_event)
401                            .await?;
402                        self.envelope_store
403                            .create_hold_request_in_tx(
404                                &mut tx,
405                                &NewHoldRequest {
406                                    hold_id: &hold_id,
407                                    envelope_id: &env.envelope_id,
408                                    agent_id: &action.actor_id,
409                                    trigger_target: &action.target,
410                                    trigger_action: action.action_type.as_str(),
411                                    pending_payload: &json!({
412                                        "target": &action.target,
413                                        "action_type": action.action_type.as_str(),
414                                        "payload": &action.payload,
415                                        "reserved_cost": reserved_cost
416                                    }),
417                                },
418                            )
419                            .await?;
420                        // Envelope stays Active — agent can continue submitting (PIP-001 §11b).
421
422                        // Audit trail — atomic with event (whitepaper §3 invariant 5).
423                        let log_index = hold_event.log_index as u64;
424                        self.audit_log
425                            .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
426                            .await
427                            .map_err(|e| KernelError::Audit(e.to_string()))?;
428                        // Checkpoint generated lazily on read, not here.
429                        tx.commit().await?;
430
431                        return Err(KernelError::HoldTriggered {
432                            hold_id,
433                            agent_id: action.actor_id.clone(),
434                        });
435                    }
436                }
437            }
438        }
439
440        // PIP-001 §11d: Check for hold_response (approve/reject) operations.
441        // Pattern: mutate "ledger/hold/<hold_id>" with payload { decision, instruction? }
442        let hold_response = if matches!(action.action_type, ActionType::Mutate) {
443            parse_hold_response(&action.target, &action.payload)
444        } else {
445            None
446        };
447
448        // Validate and execute hold_response before entering the normal pipeline.
449        if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
450            return self
451                .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
452                .await;
453        }
454
455        // Phase 4a: Check for lifecycle operations (freeze/unfreeze/terminate).
456        let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
457            lifecycle::parse_lifecycle_op(&action.target, &action.payload)
458        } else {
459            None
460        };
461
462        // Validate lifecycle authorization before proceeding.
463        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
464            let initiator = self
465                .actor_store
466                .get(&action.actor_id)
467                .await?
468                .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
469            let target_actor = self
470                .actor_store
471                .get(target_actor_id)
472                .await?
473                .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
474            lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
475        }
476
477        let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
478        let create_envelope_spec =
479            parse_create_envelope_spec(&action, &self.envelope_store).await?;
480        let policy_version = parse_policy_version(&action);
481
482        // Step 2: QUOTE + Step 3: RESERVE
483        // PIP-001 §11d: Hold-approved actions already have energy reserved at trigger time.
484        // The sentinel `_hold_reserved_cost` carries the pre-reserved amount so we skip
485        // double-charging. We create a phantom EnergyReservation so settle correctly
486        // reduces reserved_energy without calling reserve() again.
487        let (reserved_cost, reservation) = if let Some(hold_reserved) = action
488            .payload
489            .get("_hold_reserved_cost")
490            .and_then(|v| v.as_i64())
491        {
492            let phantom = if hold_reserved > 0 {
493                Some(EnergyReservation {
494                    actor_id: action.actor_id.clone(),
495                    reserved: hold_reserved,
496                })
497            } else {
498                None
499            };
500            (hold_reserved, phantom)
501        } else {
502            // All actions pay quote_cost (action_cost + append_cost).
503            // For observe: action_cost=0 but append_cost >= 1 (Landauer).
504            let cost = quote_cost(&action) as i64;
505            let res = if cost > 0 {
506                Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
507            } else {
508                None
509            };
510            (cost, res)
511        };
512
513        // Step 4: VALIDATE EXECUTE PAYLOAD (PIP-002 §2 — for Execute actions only)
514        // The kernel does not execute anything. It validates the actor-submitted
515        // result and records it. Actor executes, kernel records.
516        let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
517            Some(Self::validate_execute_payload(&action.payload)?)
518        } else {
519            None
520        };
521
522        // Step 5: SETTLE
523        // For PIP-002: settled cost equals reserved cost (no post-execution IO adjustment).
524        let settled_cost = reserved_cost;
525
526        // Step 6: APPEND
527        let mut event = EventRecord {
528            id: Uuid::new_v4().to_string(),
529            log_index: 0,
530            event_hash: String::new(),
531            actor_id: action.actor_id.clone(),
532            action_type: action.action_type.as_str().to_string(),
533            target: action.target.clone(),
534            payload: action.payload.clone(),
535            payload_hash: payload_hash_hex(&action)?,
536            artifact_hash: artifact_hash.clone(),
537            reserved_energy: reserved_cost,
538            settled_energy: settled_cost,
539            timestamp: now_millis_string(),
540        };
541        self.finalize_energy_and_event(
542            reservation.as_ref(),
543            settled_cost,
544            create_actor_spec.as_ref(),
545            create_envelope_spec.as_ref(),
546            &mut event,
547        )
548        .await?;
549
550        // Step 7: POST-COMMIT — envelope budget consumption + checkpoints + policy + lifecycle
551        if let Some(ref version) = policy_version {
552            if let Err(err) = self.state_store.set_policy_version(version).await {
553                warn!(error = %err, "failed to record policy version after commit");
554            } else {
555                info!(policy_version = %version, event_id = %event.id, "policy version updated");
556            }
557        }
558
559        // Phase 4b: Post-commit envelope budget consumption and checkpoint (PIP-001 §11).
560        if action.action_type.is_state_changing()
561            && settled_cost > 0
562            && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
563            && actor.actor_type == ActorType::Agent
564            && let Ok(Some(envelope)) = self
565                .envelope_store
566                .get_active_for_actor(&action.actor_id)
567                .await
568        {
569            // Check checkpoint BEFORE consuming (so we use pre-consume state).
570            let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
571
572            // Consume budget.
573            let pool = self.state_store.pool();
574            let mut tx = pool.begin().await?;
575            match self
576                .envelope_store
577                .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
578                .await
579            {
580                Ok(_new_consumed) => {
581                    // Handle checkpoint levels (PIP-001 §11).
582                    match checkpoint {
583                        Some(CheckpointLevel::Halt) => {
584                            self.envelope_store
585                                .set_status_in_tx(
586                                    &mut tx,
587                                    &envelope.envelope_id,
588                                    &consent::EnvelopeStatus::Expired,
589                                )
590                                .await?;
591                            info!(
592                                envelope_id = %envelope.envelope_id,
593                                actor_id = %action.actor_id,
594                                "checkpoint: HALT — envelope budget exhausted"
595                            );
596                        }
597                        Some(CheckpointLevel::Report) => {
598                            info!(
599                                envelope_id = %envelope.envelope_id,
600                                actor_id = %action.actor_id,
601                                budget = envelope.budget,
602                                consumed = envelope.budget_consumed + settled_cost,
603                                "checkpoint: REPORT — summary logged"
604                            );
605                        }
606                        None => {}
607                    }
608                    tx.commit().await?;
609                }
610                Err(err) => {
611                    warn!(
612                        error = %err,
613                        "envelope budget consumption failed (event already committed)"
614                    );
615                    // Transaction is automatically rolled back on drop.
616                }
617            }
618        }
619
620        // Phase 4a: Execute lifecycle operation after event is committed.
621        if let Some((ref target_actor_id, ref op)) = lifecycle_op {
622            let pool = self.state_store.pool();
623            match op {
624                punkgo_core::actor::LifecycleOp::Freeze { .. } => {
625                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
626                    {
627                        Ok(frozen_ids) => {
628                            info!(
629                                target = %target_actor_id,
630                                cascade_count = frozen_ids.len(),
631                                "lifecycle: freeze executed"
632                            );
633                        }
634                        Err(err) => {
635                            warn!(error = %err, "lifecycle: freeze failed (event already committed)");
636                        }
637                    }
638                }
639                punkgo_core::actor::LifecycleOp::Unfreeze => {
640                    match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
641                        .await
642                    {
643                        Ok(()) => {
644                            info!(target = %target_actor_id, "lifecycle: unfreeze executed");
645                        }
646                        Err(err) => {
647                            warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
648                        }
649                    }
650                }
651                punkgo_core::actor::LifecycleOp::Terminate { .. } => {
652                    // Terminate sets the actor to frozen permanently.
653                    // Orphan handling is a future enhancement — for now, children are cascade-frozen.
654                    match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
655                    {
656                        Ok(frozen_ids) => {
657                            info!(
658                                target = %target_actor_id,
659                                cascade_count = frozen_ids.len(),
660                                "lifecycle: terminate executed (cascade frozen)"
661                            );
662                        }
663                        Err(err) => {
664                            warn!(error = %err, "lifecycle: terminate failed (event already committed)");
665                        }
666                    }
667                }
668                punkgo_core::actor::LifecycleOp::UpdateEnergyShare { energy_share } => {
669                    match lifecycle::execute_update_energy_share(
670                        &self.actor_store,
671                        &pool,
672                        target_actor_id,
673                        *energy_share,
674                    )
675                    .await
676                    {
677                        Ok(()) => {
678                            info!(
679                                target = %target_actor_id,
680                                energy_share,
681                                "lifecycle: energy_share updated"
682                            );
683                        }
684                        Err(err) => {
685                            warn!(error = %err, "lifecycle: update_energy_share failed (event already committed)");
686                        }
687                    }
688                }
689            }
690        }
691
692        Ok(SubmitReceipt {
693            event_id: event.id,
694            log_index: event.log_index,
695            event_hash: event.event_hash,
696            reserved_cost,
697            settled_cost,
698            artifact_hash,
699        })
700    }
701
702    async fn finalize_energy_and_event(
703        &self,
704        reservation: Option<&EnergyReservation>,
705        settled_cost: i64,
706        create_actor: Option<&CreateActorSpec>,
707        create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
708        event: &mut EventRecord,
709    ) -> KernelResult<()> {
710        let pool = self.state_store.pool();
711        let mut tx = pool.begin().await?;
712
713        // Energy settlement (if reservation exists).
714        if let Some(res) = reservation {
715            self.energy_ledger
716                .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
717                .await?;
718        }
719
720        // Actor creation (if applicable).
721        if let Some(spec) = create_actor {
722            self.energy_ledger
723                .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
724                .await?;
725            self.actor_store.create_in_tx(&mut tx, spec).await?;
726            info!(
727                created_actor = %spec.actor_id,
728                actor_type = spec.actor_type.as_str(),
729                energy_balance = spec.energy_balance,
730                "actor created in transaction"
731            );
732        }
733
734        // Envelope creation (if applicable, whitepaper §3 invariant 6).
735        if let Some((envelope_id, spec, parent_id)) = create_envelope {
736            self.envelope_store
737                .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
738                .await?;
739            info!(
740                envelope_id = %envelope_id,
741                actor_id = %spec.actor_id,
742                grantor_id = %spec.grantor_id,
743                budget = spec.budget,
744                "envelope created in transaction"
745            );
746        }
747
748        // Event append.
749        self.event_log.append_in_tx(&mut tx, event).await?;
750
751        // Audit trail update — atomic with event (whitepaper §3 invariant 5).
752        let log_index = event.log_index as u64;
753        self.audit_log
754            .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
755            .await
756            .map_err(|e| KernelError::Audit(e.to_string()))?;
757
758        // Checkpoint is NOT generated on every event. It is a derived artifact
759        // (tree root computed from leaf hashes) and can be generated on-demand
760        // when queried (receipt, show, verify) or via explicit checkpoint command.
761        // This keeps the write path fast and lock-free for concurrent access.
762        tx.commit().await?;
763        info!(event_id = %event.id, log_index = event.log_index, "event committed");
764        Ok(())
765    }
766
767    /// PIP-002 §2+§3: Validate execute action payload.
768    ///
769    /// The kernel MUST reject an execute submission that is missing any required
770    /// field or uses an invalid OID format. Returns the artifact_hash on success.
771    fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
772        Self::require_valid_oid(payload, "input_oid")?;
773        Self::require_valid_oid(payload, "output_oid")?;
774
775        if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
776            return Err(KernelError::ExecutePayloadInvalid(
777                "missing or invalid exit_code (must be integer)".to_string(),
778            ));
779        }
780
781        let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
782        Ok(artifact_hash)
783    }
784
785    /// Validate that a payload field exists and matches OID format: `sha256:<64 hex chars>`.
786    fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
787        let val = payload
788            .get(field)
789            .and_then(|v| v.as_str())
790            .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
791
792        if !val.starts_with("sha256:") || val.len() != 71 {
793            return Err(KernelError::ExecutePayloadInvalid(format!(
794                "{field} must be sha256:<64 hex chars>, got: {val}"
795            )));
796        }
797        let hex_part = &val[7..];
798        if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
799            return Err(KernelError::ExecutePayloadInvalid(format!(
800                "{field} contains non-hex characters: {val}"
801            )));
802        }
803
804        Ok(val.to_string())
805    }
806
807    async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
808        // Visibility boundary gate for read queries.
809        // Currently all-transparent. When multi-user collaboration is
810        // introduced, check_read_access will enforce readable scope per
811        // requester. See PIP-001 §8 (writable_targets).
812        let requester = query.requester_id.as_deref().unwrap_or("anonymous");
813        check_read_access(requester, &query.kind)?;
814
815        match query.kind.as_str() {
816            "health" => Ok(json!({ "status": "ok" })),
817            "actor_energy" => {
818                let actor_id = query.actor_id.ok_or_else(|| {
819                    KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
820                })?;
821                let (energy_balance, reserved_energy) =
822                    self.energy_ledger.balance_view(&actor_id).await?;
823                Ok(json!({
824                    "actor_id": actor_id,
825                    "energy_balance": energy_balance,
826                    "reserved_energy": reserved_energy
827                }))
828            }
829            "events" => {
830                let limit = query.limit.unwrap_or(20).clamp(1, 500);
831                let events = self
832                    .event_log
833                    .query(
834                        query.actor_id.as_deref(),
835                        query.before_index,
836                        query.after_index,
837                        limit,
838                    )
839                    .await?;
840                // Pagination metadata: smallest log_index in result set.
841                // Client passes this as `before_index` to fetch the next page.
842                let next_cursor = events.last().map(|e| e.log_index);
843                let has_more = events.len() as i64 == limit;
844                Ok(json!({
845                    "events": events,
846                    "has_more": has_more,
847                    "next_cursor": next_cursor
848                }))
849            }
850            "stats" => {
851                let event_count = self.event_log.count().await?;
852                Ok(json!({ "event_count": event_count }))
853            }
854            "snapshot" => {
855                // Legacy: snapshot is superseded by audit checkpoint.
856                // Return audit checkpoint data for backward compatibility.
857                let event_count = self.event_log.count().await?;
858                self.audit_log
859                    .ensure_checkpoint(event_count as u64)
860                    .await
861                    .map_err(|e| KernelError::Audit(e.to_string()))?;
862                let cp = self
863                    .audit_log
864                    .latest_checkpoint()
865                    .await
866                    .map_err(|e| KernelError::Audit(e.to_string()))?;
867                Ok(json!({
868                    "event_count": event_count,
869                    "snapshot_hash": cp.root_hash,
870                    "generated_at": cp.created_at
871                }))
872            }
873            "paths" => {
874                let paths = self.state_store.paths();
875                Ok(json!({
876                    "root": paths.root.display().to_string(),
877                    "workspace_root": paths.workspace_root.display().to_string(),
878                    "quarantine_root": paths.quarantine_root.display().to_string(),
879                    "db_path": paths.db_path.display().to_string()
880                }))
881            }
882            "audit_checkpoint" => {
883                let event_count = self.event_log.count().await?;
884                self.audit_log
885                    .ensure_checkpoint(event_count as u64)
886                    .await
887                    .map_err(|e| KernelError::Audit(e.to_string()))?;
888                let cp = self
889                    .audit_log
890                    .latest_checkpoint()
891                    .await
892                    .map_err(|e| KernelError::Audit(e.to_string()))?;
893                Ok(serde_json::to_value(cp)?)
894            }
895            "audit_inclusion_proof" => {
896                let log_index = query.log_index.ok_or_else(|| {
897                    KernelError::InvalidRequest(
898                        "log_index is required for audit_inclusion_proof".to_string(),
899                    )
900                })? as u64;
901                let tree_size = match query.tree_size {
902                    Some(s) => s as u64,
903                    None => {
904                        // Ensure checkpoint is current before deriving tree_size.
905                        let event_count = self.event_log.count().await? as u64;
906                        self.audit_log
907                            .ensure_checkpoint(event_count)
908                            .await
909                            .map_err(|e| KernelError::Audit(e.to_string()))?;
910                        self.audit_log
911                            .tree_size()
912                            .await
913                            .map_err(|e| KernelError::Audit(e.to_string()))?
914                    }
915                };
916                let proof = self
917                    .audit_log
918                    .inclusion_proof(log_index, tree_size)
919                    .await
920                    .map_err(|e| KernelError::Audit(e.to_string()))?;
921                Ok(json!({
922                    "log_index": log_index,
923                    "tree_size": tree_size,
924                    "proof": proof
925                }))
926            }
927            "audit_consistency_proof" => {
928                let old_size = query.old_size.ok_or_else(|| {
929                    KernelError::InvalidRequest(
930                        "old_size is required for audit_consistency_proof".to_string(),
931                    )
932                })? as u64;
933                let new_size = query.tree_size.ok_or_else(|| {
934                    KernelError::InvalidRequest(
935                        "tree_size is required for audit_consistency_proof".to_string(),
936                    )
937                })? as u64;
938                let proof = self
939                    .audit_log
940                    .consistency_proof(old_size, new_size)
941                    .await
942                    .map_err(|e| KernelError::Audit(e.to_string()))?;
943                Ok(json!({
944                    "old_size": old_size,
945                    "new_size": new_size,
946                    "proof": proof
947                }))
948            }
949            // Phase 2: stellar configuration read query.
950            "stellar_info" => {
951                let config = &self.stellar_config;
952                Ok(json!({
953                    "gpu_model": config.gpu_model,
954                    "cpu_model": config.cpu_model,
955                    "int8_tops": config.int8_tops,
956                    "energy_per_tick": config.effective_energy_per_tick(),
957                    "tick_interval_ms": config.tick_interval_ms,
958                    "luminosity_source": config.luminosity_source
959                }))
960            }
961            // Phase 4b: envelope_info read query — retrieve active envelope for an actor.
962            "envelope_info" => {
963                let actor_id = query.actor_id.ok_or_else(|| {
964                    KernelError::InvalidRequest(
965                        "actor_id is required for envelope_info".to_string(),
966                    )
967                })?;
968                let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
969                match envelope {
970                    Some(record) => Ok(serde_json::to_value(record)?),
971                    None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
972                }
973            }
974            // Phase 1: actor_info read query — retrieve actor record.
975            "actor_info" => {
976                let actor_id = query.actor_id.ok_or_else(|| {
977                    KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
978                })?;
979                let actor = self.actor_store.get(&actor_id).await?;
980                match actor {
981                    Some(record) => Ok(serde_json::to_value(record)?),
982                    None => Err(KernelError::ActorNotFound(actor_id)),
983                }
984            }
985            // PIP-001 §11: hold_info — retrieve a single hold_request by hold_id.
986            "hold_info" => {
987                let hold_id = query.hold_id.ok_or_else(|| {
988                    KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
989                })?;
990                let hold = self.envelope_store.get_hold_request(&hold_id).await?;
991                match hold {
992                    Some(record) => Ok(record),
993                    None => Err(KernelError::InvalidRequest(format!(
994                        "hold_request not found: {hold_id}"
995                    ))),
996                }
997            }
998            // PIP-001 §11: holds_pending — list pending hold_requests, optionally filtered by agent.
999            "holds_pending" => {
1000                let holds = self
1001                    .envelope_store
1002                    .list_pending_holds(query.actor_id.as_deref())
1003                    .await?;
1004                Ok(json!({ "holds": holds }))
1005            }
1006            other => Err(KernelError::InvalidRequest(format!(
1007                "unsupported read query kind: {other}"
1008            ))),
1009        }
1010    }
1011}
1012
1013fn now_millis_string() -> String {
1014    let now = std::time::SystemTime::now()
1015        .duration_since(std::time::UNIX_EPOCH)
1016        .unwrap_or_default();
1017    now.as_millis().to_string()
1018}
1019
1020fn now_millis_u64() -> u64 {
1021    std::time::SystemTime::now()
1022        .duration_since(std::time::UNIX_EPOCH)
1023        .unwrap_or_default()
1024        .as_millis() as u64
1025}
1026
1027/// If the action is a `system/policy` Create, extract the policy version string.
1028/// Returns `None` for all other actions.
1029fn parse_policy_version(action: &Action) -> Option<String> {
1030    if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
1031        return None;
1032    }
1033    action
1034        .payload
1035        .get("version")
1036        .and_then(|v| v.as_str())
1037        .map(|s| s.to_string())
1038}
1039
1040/// Phase 1: Parse actor creation spec from a `ledger/actor` Create action.
1041///
1042/// Backward compatible: old format `{actor_id, energy_balance}` is supported
1043/// by defaulting to actor_type=agent, purpose="legacy", writable_targets=[].
1044///
1045/// New format adds: actor_type, purpose, writable_targets, energy_share.
1046async fn parse_create_actor_spec(
1047    action: &Action,
1048    actor_store: &ActorStore,
1049) -> KernelResult<Option<CreateActorSpec>> {
1050    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
1051        return Ok(None);
1052    }
1053
1054    let payload = action.payload.as_object().ok_or_else(|| {
1055        KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1056    })?;
1057
1058    // Required: actor_id (or derive from purpose)
1059    let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1060    let purpose = payload
1061        .get("purpose")
1062        .and_then(Value::as_str)
1063        .unwrap_or("legacy");
1064    let energy_balance = payload
1065        .get("energy_balance")
1066        .and_then(Value::as_i64)
1067        .unwrap_or(1000);
1068
1069    // Actor type: default to "agent" for backward compat
1070    let actor_type_str = payload
1071        .get("actor_type")
1072        .and_then(Value::as_str)
1073        .unwrap_or("agent");
1074    let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1075        KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1076    })?;
1077
1078    // Derive ID if not explicitly provided
1079    let actor_id = if let Some(id) = explicit_id {
1080        id.to_string()
1081    } else {
1082        if purpose == "legacy" {
1083            return Err(KernelError::InvalidRequest(
1084                "actor_id or purpose is required to create an actor".to_string(),
1085            ));
1086        }
1087        let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1088        derive_agent_id(&action.actor_id, purpose, seq)
1089    };
1090
1091    // Build lineage from creator (PIP-001 §7) + enforce §5 (only humans create agents)
1092    let creator_record = actor_store.get(&action.actor_id).await?;
1093    let (creator_type, creator_lineage) = match &creator_record {
1094        Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1095        // Fallback for legacy actors not yet in actors table
1096        None => (ActorType::Human, vec![]),
1097    };
1098
1099    // PIP-001 §5: Only Humans can create Agents. Reject Agent as creator.
1100    if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1101        return Err(KernelError::PolicyViolation(
1102            "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1103                .to_string(),
1104        ));
1105    }
1106
1107    let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1108
1109    // Writable targets: parse from payload or default to empty
1110    let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1111        payload.get("writable_targets")
1112    {
1113        serde_json::from_value(targets_val.clone())
1114            .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1115    } else {
1116        vec![]
1117    };
1118
1119    // Phase 3 PIP-001 §11: validate child targets are subset of creator's boundary.
1120    if !writable_targets.is_empty()
1121        && let Some(ref creator) = creator_record
1122    {
1123        validate_child_targets(creator, &writable_targets)?;
1124    }
1125
1126    // Energy share: default 0.0
1127    let energy_share = payload
1128        .get("energy_share")
1129        .and_then(Value::as_f64)
1130        .unwrap_or(0.0);
1131
1132    let reduction_policy = payload
1133        .get("reduction_policy")
1134        .and_then(Value::as_str)
1135        .unwrap_or("none")
1136        .to_string();
1137
1138    Ok(Some(CreateActorSpec {
1139        actor_id,
1140        actor_type,
1141        creator_id: action.actor_id.clone(),
1142        lineage,
1143        purpose: Some(purpose.to_string()),
1144        writable_targets,
1145        energy_balance,
1146        energy_share,
1147        reduction_policy,
1148    }))
1149}
1150
1151/// Phase 4b: Parse envelope creation spec from a `ledger/envelope` Create action.
1152///
1153/// Returns (envelope_id, EnvelopeSpec, parent_envelope_id) if applicable.
1154/// The grantor is always the action's actor_id (the human or parent agent granting
1155/// the envelope).
1156///
1157/// PIP-001 §11: If the grantor is an agent, their active envelope is used as the parent
1158/// and reduction validation is performed.
1159async fn parse_create_envelope_spec(
1160    action: &Action,
1161    envelope_store: &EnvelopeStore,
1162) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1163    if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1164        return Ok(None);
1165    }
1166
1167    let payload = action.payload.as_object().ok_or_else(|| {
1168        KernelError::InvalidRequest("envelope payload must be an object".to_string())
1169    })?;
1170
1171    let actor_id = payload
1172        .get("actor_id")
1173        .and_then(Value::as_str)
1174        .ok_or_else(|| {
1175            KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1176        })?
1177        .to_string();
1178
1179    let budget = payload
1180        .get("budget")
1181        .and_then(Value::as_i64)
1182        .ok_or_else(|| {
1183            KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1184        })?;
1185
1186    if budget <= 0 {
1187        return Err(KernelError::InvalidRequest(
1188            "envelope budget must be positive".to_string(),
1189        ));
1190    }
1191
1192    let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1193        serde_json::from_value(targets_val.clone())
1194            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1195    } else {
1196        return Err(KernelError::InvalidRequest(
1197            "targets are required to create an envelope".to_string(),
1198        ));
1199    };
1200
1201    let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1202        serde_json::from_value(actions_val.clone())
1203            .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1204    } else {
1205        return Err(KernelError::InvalidRequest(
1206            "actions are required to create an envelope".to_string(),
1207        ));
1208    };
1209
1210    let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1211    let report_every = payload.get("report_every").and_then(Value::as_i64);
1212    let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1213
1214    // PIP-001 §11a: Parse hold_on rules.
1215    let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1216        serde_json::from_value(hold_on_val.clone())
1217            .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1218    } else {
1219        vec![]
1220    };
1221
1222    let spec = EnvelopeSpec {
1223        actor_id,
1224        grantor_id: action.actor_id.clone(),
1225        budget,
1226        targets,
1227        actions,
1228        duration_secs,
1229        report_every,
1230        hold_on,
1231        hold_timeout_secs,
1232    };
1233
1234    // PIP-001 §11: If grantor is an agent with an active envelope, validate reduction.
1235    let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1236        .get_active_for_actor(&action.actor_id)
1237        .await?
1238    {
1239        consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1240        Some(grantor_envelope.envelope_id)
1241    } else {
1242        None
1243    };
1244
1245    let envelope_id = Uuid::new_v4().to_string();
1246
1247    Ok(Some((envelope_id, spec, parent_envelope_id)))
1248}
1249
1250// ---------------------------------------------------------------------------
1251// PIP-001 §11d: Hold response parsing and execution
1252// ---------------------------------------------------------------------------
1253
1254/// Parse a `ledger/hold/<hold_id>` mutate target into (hold_id, decision, instruction).
1255///
1256/// Returns `None` for any other target.
1257fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1258    // Pattern: "ledger/hold/<hold_id>"
1259    let rest = target.strip_prefix("ledger/hold/")?;
1260    // Ensure there is a non-empty hold_id segment and nothing after it.
1261    if rest.is_empty() || rest.contains('/') {
1262        return None;
1263    }
1264    let hold_id = rest.to_string();
1265
1266    let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1267
1268    // Validate decision value.
1269    if decision != "approve" && decision != "reject" {
1270        return None;
1271    }
1272
1273    let instruction = payload
1274        .get("instruction")
1275        .and_then(Value::as_str)
1276        .map(|s| s.to_string());
1277
1278    Some((hold_id, decision, instruction))
1279}
1280
1281impl Kernel {
1282    /// PIP-001 §11d: Execute a hold_response (approve or reject).
1283    ///
1284    /// Validates that:
1285    /// 1. The caller is a Human actor (whitepaper §2: Human sovereignty).
1286    /// 2. The hold_request exists and is still pending.
1287    /// 3. The hold envelope belongs to an Agent actor.
1288    ///
1289    /// On approve: re-executes the pending action and writes a `hold_response` event.
1290    /// On reject: discards the pending action and writes a `hold_response` event.
1291    async fn execute_hold_response(
1292        &self,
1293        human_action: &Action,
1294        hold_id: &str,
1295        decision: &str,
1296        instruction: Option<&str>,
1297    ) -> KernelResult<SubmitReceipt> {
1298        // --- Validate caller is Human ---
1299        let caller = self
1300            .actor_store
1301            .get(&human_action.actor_id)
1302            .await?
1303            .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1304
1305        if caller.actor_type != punkgo_core::actor::ActorType::Human {
1306            return Err(KernelError::PolicyViolation(format!(
1307                "only Human actors can resolve holds; caller {} is {:?}",
1308                human_action.actor_id, caller.actor_type
1309            )));
1310        }
1311
1312        // --- Load hold_request ---
1313        let hold_record = self
1314            .envelope_store
1315            .get_hold_request(hold_id)
1316            .await?
1317            .ok_or_else(|| {
1318                KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1319            })?;
1320
1321        let _envelope_id = hold_record
1322            .get("envelope_id")
1323            .and_then(Value::as_str)
1324            .ok_or_else(|| {
1325                KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1326            })?
1327            .to_string();
1328
1329        let agent_id = hold_record
1330            .get("agent_id")
1331            .and_then(Value::as_str)
1332            .ok_or_else(|| {
1333                KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1334            })?
1335            .to_string();
1336
1337        let pending_payload = hold_record
1338            .get("pending_payload")
1339            .cloned()
1340            .unwrap_or(Value::Null);
1341
1342        let trigger_target = hold_record
1343            .get("trigger_target")
1344            .and_then(Value::as_str)
1345            .unwrap_or("")
1346            .to_string();
1347
1348        let trigger_action = hold_record
1349            .get("trigger_action")
1350            .and_then(Value::as_str)
1351            .unwrap_or("")
1352            .to_string();
1353
1354        let status = hold_record
1355            .get("status")
1356            .and_then(Value::as_str)
1357            .unwrap_or("");
1358        if status != "pending" {
1359            return Err(KernelError::PolicyViolation(format!(
1360                "hold_request {hold_id} is already resolved (status={status})"
1361            )));
1362        }
1363
1364        // PIP-001 §11e: Check if this hold has already timed out.
1365        // If so, run the lazy expiry first — it will auto-reject the hold.
1366        let envelope_id_str = hold_record
1367            .get("envelope_id")
1368            .and_then(Value::as_str)
1369            .unwrap_or("")
1370            .to_string();
1371        if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1372            if let Some(timeout_secs) = env.hold_timeout_secs {
1373                if timeout_secs > 0 {
1374                    let triggered_at: u64 = hold_record
1375                        .get("triggered_at")
1376                        .and_then(|v| v.as_str())
1377                        .and_then(|s| s.parse().ok())
1378                        .unwrap_or(0);
1379                    let now = now_millis_u64();
1380                    if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1381                        // Expire this hold first, then return error.
1382                        self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1383                            .await?;
1384                        return Err(KernelError::PolicyViolation(format!(
1385                            "hold_request {hold_id} has timed out and was auto-rejected"
1386                        )));
1387                    }
1388                }
1389            }
1390        }
1391
1392        // --- Build hold_response event payload (PIP-001 §11d) ---
1393        let response_payload = json!({
1394            "hold_id": hold_id,
1395            "agent_id": &agent_id,
1396            "decision": decision,
1397            "instruction": instruction,
1398            "trigger": {
1399                "target": &trigger_target,
1400                "action_type": &trigger_action
1401            },
1402            "resolved_by": &human_action.actor_id,
1403            "resolved_at": now_millis_string()
1404        });
1405
1406        let response_action = Action {
1407            actor_id: human_action.actor_id.clone(),
1408            action_type: ActionType::Mutate,
1409            target: format!("ledger/hold/{hold_id}"),
1410            payload: response_payload.clone(),
1411            timestamp: None,
1412        };
1413
1414        // --- Atomic transaction: resolve hold_request + settle energy + write event ---
1415        // Envelope stays Active throughout — no status change needed (PIP-001 §11b).
1416        let pool = self.state_store.pool();
1417        let mut tx = pool.begin().await?;
1418
1419        self.envelope_store
1420            .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1421            .await?;
1422
1423        // PIP-001 §11f: On reject, settle commitment cost (20% of reserved).
1424        // settle_in_tx(reserved=full, actual=commitment) releases the rest.
1425        let hold_reserved_cost = pending_payload
1426            .get("reserved_cost")
1427            .and_then(|v| v.as_i64())
1428            .unwrap_or(0);
1429
1430        let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1431            let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1432            self.energy_ledger
1433                .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1434                .await?;
1435            cost
1436        } else {
1437            0
1438        };
1439
1440        let mut response_event = EventRecord {
1441            id: Uuid::new_v4().to_string(),
1442            log_index: 0,
1443            event_hash: String::new(),
1444            actor_id: human_action.actor_id.clone(),
1445            action_type: "hold_response".to_string(),
1446            target: format!("ledger/hold/{hold_id}"),
1447            payload: response_payload.clone(),
1448            payload_hash: payload_hash_hex(&response_action)?,
1449            artifact_hash: None,
1450            reserved_energy: hold_reserved_cost,
1451            settled_energy: commitment_cost,
1452            timestamp: now_millis_string(),
1453        };
1454        self.event_log
1455            .append_in_tx(&mut tx, &mut response_event)
1456            .await?;
1457
1458        // Audit trail — atomic with event (whitepaper §3 invariant 5).
1459        let log_index = response_event.log_index as u64;
1460        self.audit_log
1461            .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1462            .await
1463            .map_err(|e| KernelError::Audit(e.to_string()))?;
1464        // Checkpoint generated lazily on read, not here.
1465        tx.commit().await?;
1466
1467        info!(
1468            hold_id = %hold_id,
1469            decision = %decision,
1470            agent_id = %agent_id,
1471            resolved_by = %human_action.actor_id,
1472            "PIP-001 §11d: hold resolved"
1473        );
1474
1475        // --- Approve: re-execute the pending action (PIP-001 §11d) ---
1476        if decision == "approve" {
1477            // Reconstruct the original action from the pending_payload stored in hold_request.
1478            let orig_target = pending_payload
1479                .get("target")
1480                .and_then(Value::as_str)
1481                .unwrap_or(&trigger_target)
1482                .to_string();
1483            let orig_action_type_str = pending_payload
1484                .get("action_type")
1485                .and_then(Value::as_str)
1486                .unwrap_or(&trigger_action)
1487                .to_string();
1488            let mut orig_payload = pending_payload
1489                .get("payload")
1490                .cloned()
1491                .unwrap_or(Value::Null);
1492
1493            // PIP-001 §11d: Energy was already reserved at hold trigger time.
1494            // `reserved_cost` was stored in pending_payload by the hold trigger.
1495            // Inject `_hold_reserved_cost` so submit_action skips double quote/reserve.
1496
1497            // Mark the payload as hold-approved so the re-execution bypasses the hold check (PIP-001 §11d).
1498            // This sentinel prevents a re-trigger loop.
1499            if let Some(obj) = orig_payload.as_object_mut() {
1500                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1501                obj.insert(
1502                    "_hold_reserved_cost".to_string(),
1503                    Value::Number(hold_reserved_cost.into()),
1504                );
1505                // PIP-001 §11d: If instruction is present, append it to the payload.
1506                if let Some(instr) = instruction {
1507                    obj.insert(
1508                        "_hold_instruction".to_string(),
1509                        Value::String(instr.to_string()),
1510                    );
1511                }
1512            } else {
1513                // If payload is not an object (e.g. null), wrap it.
1514                let mut obj = serde_json::Map::new();
1515                obj.insert("_hold_approved".to_string(), Value::Bool(true));
1516                obj.insert(
1517                    "_hold_reserved_cost".to_string(),
1518                    Value::Number(hold_reserved_cost.into()),
1519                );
1520                if let Some(instr) = instruction {
1521                    obj.insert(
1522                        "_hold_instruction".to_string(),
1523                        Value::String(instr.to_string()),
1524                    );
1525                }
1526                orig_payload = Value::Object(obj);
1527            }
1528
1529            let orig_action_type = match orig_action_type_str.as_str() {
1530                "observe" => ActionType::Observe,
1531                "create" => ActionType::Create,
1532                "mutate" => ActionType::Mutate,
1533                _ => ActionType::Execute,
1534            };
1535
1536            let pending_action = Action {
1537                actor_id: agent_id.clone(),
1538                action_type: orig_action_type,
1539                target: orig_target,
1540                payload: orig_payload,
1541                timestamp: None,
1542            };
1543
1544            info!(
1545                hold_id = %hold_id,
1546                agent_id = %agent_id,
1547                "PIP-001 §11d: re-executing approved pending action"
1548            );
1549
1550            // Re-submit the pending action on behalf of the agent.
1551            // This goes through the full pipeline (quota → reserve → execute → settle → append).
1552            // Use Box::pin to break the async recursion (submit_action → execute_hold_response → submit_action).
1553            return Box::pin(self.submit_action(pending_action)).await;
1554        }
1555
1556        // --- Reject: return a receipt with commitment cost (PIP-001 §11f) ---
1557        Ok(SubmitReceipt {
1558            event_id: response_event.id,
1559            log_index: response_event.log_index,
1560            event_hash: response_event.event_hash,
1561            reserved_cost: hold_reserved_cost,
1562            settled_cost: commitment_cost,
1563            artifact_hash: None,
1564        })
1565    }
1566
1567    /// PIP-001 §11e: Lazy expiry for timed-out holds.
1568    ///
1569    /// Called before hold-trigger checks in submit_action. Scans pending holds
1570    /// for the given envelope and auto-rejects any that have exceeded
1571    /// `hold_timeout_secs`. Same pattern as `is_envelope_expired()` — no
1572    /// background thread, just check-on-access.
1573    async fn expire_timed_out_holds(
1574        &self,
1575        envelope_id: &str,
1576        hold_timeout_secs: i64,
1577    ) -> KernelResult<()> {
1578        let holds = self
1579            .envelope_store
1580            .list_pending_holds_for_envelope(envelope_id)
1581            .await?;
1582        let now = now_millis_u64();
1583
1584        for hold in holds {
1585            let triggered_at: u64 = hold
1586                .get("triggered_at")
1587                .and_then(|v| v.as_str())
1588                .and_then(|s| s.parse().ok())
1589                .unwrap_or(0);
1590
1591            if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1592                continue;
1593            }
1594
1595            // Timed out — treat as reject (PIP-001 §11e/§11f).
1596            let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1597            let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1598            let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1599            let reserved_cost = pending_payload
1600                .get("reserved_cost")
1601                .and_then(|v| v.as_i64())
1602                .unwrap_or(0);
1603
1604            let commitment_cost = if reserved_cost > 0 {
1605                ((reserved_cost as f64) * 0.2).ceil() as i64
1606            } else {
1607                0
1608            };
1609
1610            let timeout_payload = json!({
1611                "hold_id": hold_id,
1612                "agent_id": agent_id,
1613                "decision": "timed_out",
1614                "reserved_cost": reserved_cost,
1615                "commitment_cost": commitment_cost,
1616                "triggered_at": triggered_at.to_string(),
1617                "expired_at": now.to_string()
1618            });
1619            let timeout_action = Action {
1620                actor_id: agent_id.to_string(),
1621                action_type: ActionType::Mutate,
1622                target: format!("ledger/hold/{hold_id}"),
1623                payload: timeout_payload.clone(),
1624                timestamp: None,
1625            };
1626            let mut timeout_event = EventRecord {
1627                id: Uuid::new_v4().to_string(),
1628                log_index: 0,
1629                event_hash: String::new(),
1630                actor_id: agent_id.to_string(),
1631                action_type: "hold_timeout".to_string(),
1632                target: format!("ledger/hold/{hold_id}"),
1633                payload: timeout_payload,
1634                payload_hash: payload_hash_hex(&timeout_action)?,
1635                artifact_hash: None,
1636                reserved_energy: reserved_cost,
1637                settled_energy: commitment_cost,
1638                timestamp: now_millis_string(),
1639            };
1640
1641            let pool = self.state_store.pool();
1642            let mut tx = pool.begin().await?;
1643            self.envelope_store
1644                .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1645                .await?;
1646            if reserved_cost > 0 {
1647                self.energy_ledger
1648                    .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1649                    .await?;
1650            }
1651            self.event_log
1652                .append_in_tx(&mut tx, &mut timeout_event)
1653                .await?;
1654
1655            // Audit trail — atomic with event (whitepaper §3 invariant 5).
1656            let t_log_index = timeout_event.log_index as u64;
1657            self.audit_log
1658                .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1659                .await
1660                .map_err(|e| KernelError::Audit(e.to_string()))?;
1661            // Checkpoint generated lazily on read, not here.
1662            tx.commit().await?;
1663
1664            info!(
1665                hold_id = %hold_id,
1666                agent_id = %agent_id,
1667                commitment_cost = commitment_cost,
1668                "PIP-001 §11e: hold auto-expired (timeout)"
1669            );
1670        }
1671        Ok(())
1672    }
1673}