1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11 ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20 ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21 NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26#[derive(Debug, Clone)]
28pub struct KernelConfig {
29 pub state_dir: PathBuf,
30 pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34 fn default() -> Self {
35 let state_dir = std::env::var("PUNKGO_STATE_DIR")
36 .map(PathBuf::from)
37 .unwrap_or_else(|_| default_state_dir());
38 let ipc_endpoint =
39 std::env::var("PUNKGO_IPC_ENDPOINT").unwrap_or_else(|_| default_ipc_endpoint());
40 Self {
41 state_dir,
42 ipc_endpoint,
43 }
44 }
45}
46
47fn default_ipc_endpoint() -> String {
56 if cfg!(windows) {
57 r"\\.\pipe\punkgo-kernel".to_string()
58 } else {
59 "punkgo-kernel".to_string()
60 }
61}
62
63fn default_state_dir() -> PathBuf {
64 if let Some(home) = home_dir() {
65 return home.join(".punkgo").join("state");
66 }
67 PathBuf::from("state")
68}
69
70fn home_dir() -> Option<PathBuf> {
71 if let Some(home) = std::env::var_os("HOME") {
73 return Some(PathBuf::from(home));
74 }
75 if let Some(profile) = std::env::var_os("USERPROFILE") {
77 return Some(PathBuf::from(profile));
78 }
79 let drive = std::env::var_os("HOMEDRIVE")?;
81 let path = std::env::var_os("HOMEPATH")?;
82 let mut p = PathBuf::from(drive);
83 p.push(path);
84 Some(p)
85}
86
87#[derive(Debug, Clone, Serialize)]
93pub struct SubmitReceipt {
94 pub event_id: String,
95 pub log_index: i64,
96 pub event_hash: String,
97 pub reserved_cost: i64,
98 pub settled_cost: i64,
99 pub artifact_hash: Option<String>,
100}
101
102#[derive(Debug, Deserialize)]
103struct ReadQuery {
104 kind: String,
105 actor_id: Option<String>,
106 #[serde(default)]
108 hold_id: Option<String>,
109 limit: Option<i64>,
110 log_index: Option<i64>,
112 tree_size: Option<i64>,
114 old_size: Option<i64>,
116 #[serde(default)]
119 before_index: Option<i64>,
120 #[serde(default)]
123 after_index: Option<i64>,
124 #[serde(default)]
128 requester_id: Option<String>,
129}
130
131pub struct Kernel {
139 state_store: StateStore,
140 energy_ledger: EnergyLedger,
141 event_log: EventLog,
142 audit_log: AuditLog,
143 actor_store: ActorStore,
144 envelope_store: EnvelopeStore,
145 stellar_config: StellarConfig,
146}
147
148impl Kernel {
149 pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
153 let state_store = StateStore::bootstrap(&config.state_dir).await?;
154 let energy_ledger = EnergyLedger::new(state_store.pool());
155 let event_log = EventLog::new(state_store.pool());
156 let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
157 let actor_store = ActorStore::new(state_store.pool());
158 let envelope_store = EnvelopeStore::new(state_store.pool());
159
160 let stellar_config_path = config.state_dir.join("stellar.toml");
162 let stellar_config = load_stellar_config(&stellar_config_path)?;
163 info!(
164 energy_per_tick = stellar_config.effective_energy_per_tick(),
165 int8_tops = stellar_config.int8_tops,
166 tick_interval_ms = stellar_config.tick_interval_ms,
167 "stellar configuration loaded"
168 );
169
170 Ok(Self {
171 state_store,
172 energy_ledger,
173 event_log,
174 audit_log,
175 actor_store,
176 envelope_store,
177 stellar_config,
178 })
179 }
180
181 pub fn stellar_config(&self) -> &StellarConfig {
184 &self.stellar_config
185 }
186
187 pub fn energy_ledger(&self) -> &EnergyLedger {
189 &self.energy_ledger
190 }
191
192 pub fn actor_store(&self) -> &ActorStore {
194 &self.actor_store
195 }
196
197 pub fn envelope_store(&self) -> &EnvelopeStore {
199 &self.envelope_store
200 }
201
202 pub fn pool(&self) -> sqlx::SqlitePool {
204 self.state_store.pool()
205 }
206
207 pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
209 let request_id = req.request_id.clone();
210 info!(
211 request_id = %request_id,
212 request_type = ?req.request_type,
213 "received request"
214 );
215 match self.dispatch(req).await {
216 Ok(payload) => {
217 info!(request_id = %request_id, "request completed");
218 ResponseEnvelope::ok(request_id, payload)
219 }
220 Err(err) => {
221 warn!(error = %err, "request failed");
222 ResponseEnvelope::err_structured(request_id, &err)
223 }
224 }
225 }
226
227 async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
228 match req.request_type {
229 RequestType::Quote => {
230 let action: Action = serde_json::from_value(req.payload)?;
231 validate_action(&action)?;
232 let cost = quote_cost(&action);
233 Ok(json!({ "cost": cost }))
234 }
235 RequestType::Submit => {
236 let action: Action = serde_json::from_value(req.payload)?;
237 let receipt = self.submit_action(action).await?;
238 Ok(serde_json::to_value(receipt)?)
239 }
240 RequestType::Read => {
241 let query: ReadQuery = serde_json::from_value(req.payload)?;
242 self.read_query(query).await
243 }
244 }
245 }
246
247 async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
248 validate_action(&action)?;
250 if !self.state_store.actor_exists(&action.actor_id).await? {
251 return Err(KernelError::ActorNotFound(action.actor_id.clone()));
252 }
253
254 if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
258 if actor.status == punkgo_core::actor::ActorStatus::Frozen
259 && action.action_type.is_state_changing()
260 {
261 return Err(KernelError::ActorFrozen(format!(
262 "actor {} is frozen and cannot perform state-changing actions",
263 action.actor_id
264 )));
265 }
266 check_writable_boundary(&actor, &action.target, &action.action_type)?;
268
269 if actor.actor_type == punkgo_core::actor::ActorType::Agent
272 && action.action_type.is_state_changing()
273 {
274 lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
275 }
276
277 if action.action_type.is_state_changing() {
280 let envelope = self
281 .envelope_store
282 .get_active_for_actor(&action.actor_id)
283 .await?;
284
285 let envelope = if let Some(env) = envelope {
287 if consent::is_envelope_expired(&env, now_millis_u64()) {
288 self.envelope_store
289 .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
290 .await?;
291 None
292 } else {
293 Some(env)
294 }
295 } else {
296 None
297 };
298
299 let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
300
301 if auth_mode == AuthorizationMode::ManOnTheLoop
303 && let Some(ref env) = envelope
304 {
305 consent::check_envelope_covers(
306 env,
307 &action.target,
308 action.action_type.as_str(),
309 )?;
310
311 if !env.hold_on.is_empty() {
313 if let Some(timeout_secs) = env.hold_timeout_secs {
314 if timeout_secs > 0 {
315 self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
316 .await?;
317 }
318 }
319 }
320
321 let hold_approved = action
326 .payload
327 .get("_hold_approved")
328 .and_then(|v| v.as_bool())
329 .unwrap_or(false);
330 if !hold_approved
331 && consent::check_hold_trigger(
332 &env.hold_on,
333 &action.target,
334 action.action_type.as_str(),
335 )
336 {
337 let hold_id = Uuid::new_v4().to_string();
338
339 let reserved_cost = quote_cost(&action) as i64;
342
343 let hold_payload = json!({
344 "hold_id": &hold_id,
345 "agent_id": &action.actor_id,
346 "trigger": {
347 "target": &action.target,
348 "action_type": action.action_type.as_str()
349 },
350 "pending_action": {
351 "target": &action.target,
352 "action_type": action.action_type.as_str(),
353 "payload": &action.payload
354 },
355 "reserved_cost": reserved_cost,
356 "triggered_at": now_millis_string()
357 });
358
359 let hold_action = Action {
361 actor_id: action.actor_id.clone(),
362 action_type: ActionType::Mutate,
363 target: format!("ledger/hold/{hold_id}"),
364 payload: hold_payload.clone(),
365 timestamp: None,
366 };
367 let mut hold_event = EventRecord {
368 id: Uuid::new_v4().to_string(),
369 log_index: 0,
370 event_hash: String::new(),
371 actor_id: action.actor_id.clone(),
372 action_type: "hold_request".to_string(),
373 target: format!("ledger/hold/{hold_id}"),
374 payload: hold_payload.clone(),
375 payload_hash: payload_hash_hex(&hold_action)?,
376 artifact_hash: None,
377 reserved_energy: reserved_cost,
378 settled_energy: 0,
379 timestamp: now_millis_string(),
380 };
381
382 let pool = self.state_store.pool();
384 let mut tx = pool.begin().await?;
385 if reserved_cost > 0 {
386 self.energy_ledger
387 .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
388 .await?;
389 }
390 self.event_log
391 .append_in_tx(&mut tx, &mut hold_event)
392 .await?;
393 self.envelope_store
394 .create_hold_request_in_tx(
395 &mut tx,
396 &NewHoldRequest {
397 hold_id: &hold_id,
398 envelope_id: &env.envelope_id,
399 agent_id: &action.actor_id,
400 trigger_target: &action.target,
401 trigger_action: action.action_type.as_str(),
402 pending_payload: &json!({
403 "target": &action.target,
404 "action_type": action.action_type.as_str(),
405 "payload": &action.payload,
406 "reserved_cost": reserved_cost
407 }),
408 },
409 )
410 .await?;
411 let log_index = hold_event.log_index as u64;
415 self.audit_log
416 .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
417 .await
418 .map_err(|e| KernelError::Audit(e.to_string()))?;
419 tx.commit().await?;
421
422 return Err(KernelError::HoldTriggered {
423 hold_id,
424 agent_id: action.actor_id.clone(),
425 });
426 }
427 }
428 }
429 }
430
431 let hold_response = if matches!(action.action_type, ActionType::Mutate) {
434 parse_hold_response(&action.target, &action.payload)
435 } else {
436 None
437 };
438
439 if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
441 return self
442 .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
443 .await;
444 }
445
446 let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
448 lifecycle::parse_lifecycle_op(&action.target, &action.payload)
449 } else {
450 None
451 };
452
453 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
455 let initiator = self
456 .actor_store
457 .get(&action.actor_id)
458 .await?
459 .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
460 let target_actor = self
461 .actor_store
462 .get(target_actor_id)
463 .await?
464 .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
465 lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
466 }
467
468 let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
469 let create_envelope_spec =
470 parse_create_envelope_spec(&action, &self.envelope_store).await?;
471 let policy_version = parse_policy_version(&action);
472
473 let (reserved_cost, reservation) = if let Some(hold_reserved) = action
479 .payload
480 .get("_hold_reserved_cost")
481 .and_then(|v| v.as_i64())
482 {
483 let phantom = if hold_reserved > 0 {
484 Some(EnergyReservation {
485 actor_id: action.actor_id.clone(),
486 reserved: hold_reserved,
487 })
488 } else {
489 None
490 };
491 (hold_reserved, phantom)
492 } else {
493 let cost = quote_cost(&action) as i64;
496 let res = if cost > 0 {
497 Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
498 } else {
499 None
500 };
501 (cost, res)
502 };
503
504 let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
508 Some(Self::validate_execute_payload(&action.payload)?)
509 } else {
510 None
511 };
512
513 let settled_cost = reserved_cost;
516
517 let mut event = EventRecord {
519 id: Uuid::new_v4().to_string(),
520 log_index: 0,
521 event_hash: String::new(),
522 actor_id: action.actor_id.clone(),
523 action_type: action.action_type.as_str().to_string(),
524 target: action.target.clone(),
525 payload: action.payload.clone(),
526 payload_hash: payload_hash_hex(&action)?,
527 artifact_hash: artifact_hash.clone(),
528 reserved_energy: reserved_cost,
529 settled_energy: settled_cost,
530 timestamp: now_millis_string(),
531 };
532 self.finalize_energy_and_event(
533 reservation.as_ref(),
534 settled_cost,
535 create_actor_spec.as_ref(),
536 create_envelope_spec.as_ref(),
537 &mut event,
538 )
539 .await?;
540
541 if let Some(ref version) = policy_version {
543 if let Err(err) = self.state_store.set_policy_version(version).await {
544 warn!(error = %err, "failed to record policy version after commit");
545 } else {
546 info!(policy_version = %version, event_id = %event.id, "policy version updated");
547 }
548 }
549
550 if action.action_type.is_state_changing()
552 && settled_cost > 0
553 && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
554 && actor.actor_type == ActorType::Agent
555 && let Ok(Some(envelope)) = self
556 .envelope_store
557 .get_active_for_actor(&action.actor_id)
558 .await
559 {
560 let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
562
563 let pool = self.state_store.pool();
565 let mut tx = pool.begin().await?;
566 match self
567 .envelope_store
568 .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
569 .await
570 {
571 Ok(_new_consumed) => {
572 match checkpoint {
574 Some(CheckpointLevel::Halt) => {
575 self.envelope_store
576 .set_status_in_tx(
577 &mut tx,
578 &envelope.envelope_id,
579 &consent::EnvelopeStatus::Expired,
580 )
581 .await?;
582 info!(
583 envelope_id = %envelope.envelope_id,
584 actor_id = %action.actor_id,
585 "checkpoint: HALT — envelope budget exhausted"
586 );
587 }
588 Some(CheckpointLevel::Report) => {
589 info!(
590 envelope_id = %envelope.envelope_id,
591 actor_id = %action.actor_id,
592 budget = envelope.budget,
593 consumed = envelope.budget_consumed + settled_cost,
594 "checkpoint: REPORT — summary logged"
595 );
596 }
597 None => {}
598 }
599 tx.commit().await?;
600 }
601 Err(err) => {
602 warn!(
603 error = %err,
604 "envelope budget consumption failed (event already committed)"
605 );
606 }
608 }
609 }
610
611 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
613 let pool = self.state_store.pool();
614 match op {
615 punkgo_core::actor::LifecycleOp::Freeze { .. } => {
616 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
617 {
618 Ok(frozen_ids) => {
619 info!(
620 target = %target_actor_id,
621 cascade_count = frozen_ids.len(),
622 "lifecycle: freeze executed"
623 );
624 }
625 Err(err) => {
626 warn!(error = %err, "lifecycle: freeze failed (event already committed)");
627 }
628 }
629 }
630 punkgo_core::actor::LifecycleOp::Unfreeze => {
631 match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
632 .await
633 {
634 Ok(()) => {
635 info!(target = %target_actor_id, "lifecycle: unfreeze executed");
636 }
637 Err(err) => {
638 warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
639 }
640 }
641 }
642 punkgo_core::actor::LifecycleOp::Terminate { .. } => {
643 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
646 {
647 Ok(frozen_ids) => {
648 info!(
649 target = %target_actor_id,
650 cascade_count = frozen_ids.len(),
651 "lifecycle: terminate executed (cascade frozen)"
652 );
653 }
654 Err(err) => {
655 warn!(error = %err, "lifecycle: terminate failed (event already committed)");
656 }
657 }
658 }
659 punkgo_core::actor::LifecycleOp::UpdateEnergyShare { energy_share } => {
660 match lifecycle::execute_update_energy_share(
661 &self.actor_store,
662 &pool,
663 target_actor_id,
664 *energy_share,
665 )
666 .await
667 {
668 Ok(()) => {
669 info!(
670 target = %target_actor_id,
671 energy_share,
672 "lifecycle: energy_share updated"
673 );
674 }
675 Err(err) => {
676 warn!(error = %err, "lifecycle: update_energy_share failed (event already committed)");
677 }
678 }
679 }
680 }
681 }
682
683 Ok(SubmitReceipt {
684 event_id: event.id,
685 log_index: event.log_index,
686 event_hash: event.event_hash,
687 reserved_cost,
688 settled_cost,
689 artifact_hash,
690 })
691 }
692
693 async fn finalize_energy_and_event(
694 &self,
695 reservation: Option<&EnergyReservation>,
696 settled_cost: i64,
697 create_actor: Option<&CreateActorSpec>,
698 create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
699 event: &mut EventRecord,
700 ) -> KernelResult<()> {
701 let pool = self.state_store.pool();
702 let mut tx = pool.begin().await?;
703
704 if let Some(res) = reservation {
706 self.energy_ledger
707 .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
708 .await?;
709 }
710
711 if let Some(spec) = create_actor {
713 self.energy_ledger
714 .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
715 .await?;
716 self.actor_store.create_in_tx(&mut tx, spec).await?;
717 info!(
718 created_actor = %spec.actor_id,
719 actor_type = spec.actor_type.as_str(),
720 energy_balance = spec.energy_balance,
721 "actor created in transaction"
722 );
723 }
724
725 if let Some((envelope_id, spec, parent_id)) = create_envelope {
727 self.envelope_store
728 .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
729 .await?;
730 info!(
731 envelope_id = %envelope_id,
732 actor_id = %spec.actor_id,
733 grantor_id = %spec.grantor_id,
734 budget = spec.budget,
735 "envelope created in transaction"
736 );
737 }
738
739 self.event_log.append_in_tx(&mut tx, event).await?;
741
742 let log_index = event.log_index as u64;
744 self.audit_log
745 .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
746 .await
747 .map_err(|e| KernelError::Audit(e.to_string()))?;
748
749 tx.commit().await?;
754 info!(event_id = %event.id, log_index = event.log_index, "event committed");
755 Ok(())
756 }
757
758 fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
763 Self::require_valid_oid(payload, "input_oid")?;
764 Self::require_valid_oid(payload, "output_oid")?;
765
766 if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
767 return Err(KernelError::ExecutePayloadInvalid(
768 "missing or invalid exit_code (must be integer)".to_string(),
769 ));
770 }
771
772 let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
773 Ok(artifact_hash)
774 }
775
776 fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
778 let val = payload
779 .get(field)
780 .and_then(|v| v.as_str())
781 .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
782
783 if !val.starts_with("sha256:") || val.len() != 71 {
784 return Err(KernelError::ExecutePayloadInvalid(format!(
785 "{field} must be sha256:<64 hex chars>, got: {val}"
786 )));
787 }
788 let hex_part = &val[7..];
789 if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
790 return Err(KernelError::ExecutePayloadInvalid(format!(
791 "{field} contains non-hex characters: {val}"
792 )));
793 }
794
795 Ok(val.to_string())
796 }
797
798 async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
799 let requester = query.requester_id.as_deref().unwrap_or("anonymous");
804 check_read_access(requester, &query.kind)?;
805
806 match query.kind.as_str() {
807 "health" => Ok(json!({ "status": "ok" })),
808 "actor_energy" => {
809 let actor_id = query.actor_id.ok_or_else(|| {
810 KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
811 })?;
812 let (energy_balance, reserved_energy) =
813 self.energy_ledger.balance_view(&actor_id).await?;
814 Ok(json!({
815 "actor_id": actor_id,
816 "energy_balance": energy_balance,
817 "reserved_energy": reserved_energy
818 }))
819 }
820 "events" => {
821 let limit = query.limit.unwrap_or(20).clamp(1, 500);
822 let events = self
823 .event_log
824 .query(
825 query.actor_id.as_deref(),
826 query.before_index,
827 query.after_index,
828 limit,
829 )
830 .await?;
831 let next_cursor = events.last().map(|e| e.log_index);
834 let has_more = events.len() as i64 == limit;
835 Ok(json!({
836 "events": events,
837 "has_more": has_more,
838 "next_cursor": next_cursor
839 }))
840 }
841 "stats" => {
842 let event_count = self.event_log.count().await?;
843 Ok(json!({ "event_count": event_count }))
844 }
845 "snapshot" => {
846 let event_count = self.event_log.count().await?;
849 self.audit_log
850 .ensure_checkpoint(event_count as u64)
851 .await
852 .map_err(|e| KernelError::Audit(e.to_string()))?;
853 let cp = self
854 .audit_log
855 .latest_checkpoint()
856 .await
857 .map_err(|e| KernelError::Audit(e.to_string()))?;
858 Ok(json!({
859 "event_count": event_count,
860 "snapshot_hash": cp.root_hash,
861 "generated_at": cp.created_at
862 }))
863 }
864 "paths" => {
865 let paths = self.state_store.paths();
866 Ok(json!({
867 "root": paths.root.display().to_string(),
868 "workspace_root": paths.workspace_root.display().to_string(),
869 "quarantine_root": paths.quarantine_root.display().to_string(),
870 "db_path": paths.db_path.display().to_string()
871 }))
872 }
873 "audit_checkpoint" => {
874 let event_count = self.event_log.count().await?;
875 self.audit_log
876 .ensure_checkpoint(event_count as u64)
877 .await
878 .map_err(|e| KernelError::Audit(e.to_string()))?;
879 let cp = self
880 .audit_log
881 .latest_checkpoint()
882 .await
883 .map_err(|e| KernelError::Audit(e.to_string()))?;
884 Ok(serde_json::to_value(cp)?)
885 }
886 "audit_inclusion_proof" => {
887 let log_index = query.log_index.ok_or_else(|| {
888 KernelError::InvalidRequest(
889 "log_index is required for audit_inclusion_proof".to_string(),
890 )
891 })? as u64;
892 let tree_size = match query.tree_size {
893 Some(s) => s as u64,
894 None => {
895 let event_count = self.event_log.count().await? as u64;
897 self.audit_log
898 .ensure_checkpoint(event_count)
899 .await
900 .map_err(|e| KernelError::Audit(e.to_string()))?;
901 self.audit_log
902 .tree_size()
903 .await
904 .map_err(|e| KernelError::Audit(e.to_string()))?
905 }
906 };
907 let proof = self
908 .audit_log
909 .inclusion_proof(log_index, tree_size)
910 .await
911 .map_err(|e| KernelError::Audit(e.to_string()))?;
912 Ok(json!({
913 "log_index": log_index,
914 "tree_size": tree_size,
915 "proof": proof
916 }))
917 }
918 "audit_consistency_proof" => {
919 let old_size = query.old_size.ok_or_else(|| {
920 KernelError::InvalidRequest(
921 "old_size is required for audit_consistency_proof".to_string(),
922 )
923 })? as u64;
924 let new_size = query.tree_size.ok_or_else(|| {
925 KernelError::InvalidRequest(
926 "tree_size is required for audit_consistency_proof".to_string(),
927 )
928 })? as u64;
929 let proof = self
930 .audit_log
931 .consistency_proof(old_size, new_size)
932 .await
933 .map_err(|e| KernelError::Audit(e.to_string()))?;
934 Ok(json!({
935 "old_size": old_size,
936 "new_size": new_size,
937 "proof": proof
938 }))
939 }
940 "stellar_info" => {
942 let config = &self.stellar_config;
943 Ok(json!({
944 "gpu_model": config.gpu_model,
945 "cpu_model": config.cpu_model,
946 "int8_tops": config.int8_tops,
947 "energy_per_tick": config.effective_energy_per_tick(),
948 "tick_interval_ms": config.tick_interval_ms,
949 "luminosity_source": config.luminosity_source
950 }))
951 }
952 "envelope_info" => {
954 let actor_id = query.actor_id.ok_or_else(|| {
955 KernelError::InvalidRequest(
956 "actor_id is required for envelope_info".to_string(),
957 )
958 })?;
959 let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
960 match envelope {
961 Some(record) => Ok(serde_json::to_value(record)?),
962 None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
963 }
964 }
965 "actor_info" => {
967 let actor_id = query.actor_id.ok_or_else(|| {
968 KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
969 })?;
970 let actor = self.actor_store.get(&actor_id).await?;
971 match actor {
972 Some(record) => Ok(serde_json::to_value(record)?),
973 None => Err(KernelError::ActorNotFound(actor_id)),
974 }
975 }
976 "hold_info" => {
978 let hold_id = query.hold_id.ok_or_else(|| {
979 KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
980 })?;
981 let hold = self.envelope_store.get_hold_request(&hold_id).await?;
982 match hold {
983 Some(record) => Ok(record),
984 None => Err(KernelError::InvalidRequest(format!(
985 "hold_request not found: {hold_id}"
986 ))),
987 }
988 }
989 "holds_pending" => {
991 let holds = self
992 .envelope_store
993 .list_pending_holds(query.actor_id.as_deref())
994 .await?;
995 Ok(json!({ "holds": holds }))
996 }
997 other => Err(KernelError::InvalidRequest(format!(
998 "unsupported read query kind: {other}"
999 ))),
1000 }
1001 }
1002}
1003
1004fn now_millis_string() -> String {
1005 let now = std::time::SystemTime::now()
1006 .duration_since(std::time::UNIX_EPOCH)
1007 .unwrap_or_default();
1008 now.as_millis().to_string()
1009}
1010
1011fn now_millis_u64() -> u64 {
1012 std::time::SystemTime::now()
1013 .duration_since(std::time::UNIX_EPOCH)
1014 .unwrap_or_default()
1015 .as_millis() as u64
1016}
1017
1018fn parse_policy_version(action: &Action) -> Option<String> {
1021 if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
1022 return None;
1023 }
1024 action
1025 .payload
1026 .get("version")
1027 .and_then(|v| v.as_str())
1028 .map(|s| s.to_string())
1029}
1030
1031async fn parse_create_actor_spec(
1038 action: &Action,
1039 actor_store: &ActorStore,
1040) -> KernelResult<Option<CreateActorSpec>> {
1041 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
1042 return Ok(None);
1043 }
1044
1045 let payload = action.payload.as_object().ok_or_else(|| {
1046 KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1047 })?;
1048
1049 let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1051 let purpose = payload
1052 .get("purpose")
1053 .and_then(Value::as_str)
1054 .unwrap_or("legacy");
1055 let energy_balance = payload
1056 .get("energy_balance")
1057 .and_then(Value::as_i64)
1058 .unwrap_or(1000);
1059
1060 let actor_type_str = payload
1062 .get("actor_type")
1063 .and_then(Value::as_str)
1064 .unwrap_or("agent");
1065 let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1066 KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1067 })?;
1068
1069 let actor_id = if let Some(id) = explicit_id {
1071 id.to_string()
1072 } else {
1073 if purpose == "legacy" {
1074 return Err(KernelError::InvalidRequest(
1075 "actor_id or purpose is required to create an actor".to_string(),
1076 ));
1077 }
1078 let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1079 derive_agent_id(&action.actor_id, purpose, seq)
1080 };
1081
1082 let creator_record = actor_store.get(&action.actor_id).await?;
1084 let (creator_type, creator_lineage) = match &creator_record {
1085 Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1086 None => (ActorType::Human, vec![]),
1088 };
1089
1090 if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1092 return Err(KernelError::PolicyViolation(
1093 "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1094 .to_string(),
1095 ));
1096 }
1097
1098 let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1099
1100 let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1102 payload.get("writable_targets")
1103 {
1104 serde_json::from_value(targets_val.clone())
1105 .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1106 } else {
1107 vec![]
1108 };
1109
1110 if !writable_targets.is_empty()
1112 && let Some(ref creator) = creator_record
1113 {
1114 validate_child_targets(creator, &writable_targets)?;
1115 }
1116
1117 let energy_share = payload
1119 .get("energy_share")
1120 .and_then(Value::as_f64)
1121 .unwrap_or(0.0);
1122
1123 let reduction_policy = payload
1124 .get("reduction_policy")
1125 .and_then(Value::as_str)
1126 .unwrap_or("none")
1127 .to_string();
1128
1129 Ok(Some(CreateActorSpec {
1130 actor_id,
1131 actor_type,
1132 creator_id: action.actor_id.clone(),
1133 lineage,
1134 purpose: Some(purpose.to_string()),
1135 writable_targets,
1136 energy_balance,
1137 energy_share,
1138 reduction_policy,
1139 }))
1140}
1141
1142async fn parse_create_envelope_spec(
1151 action: &Action,
1152 envelope_store: &EnvelopeStore,
1153) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1154 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1155 return Ok(None);
1156 }
1157
1158 let payload = action.payload.as_object().ok_or_else(|| {
1159 KernelError::InvalidRequest("envelope payload must be an object".to_string())
1160 })?;
1161
1162 let actor_id = payload
1163 .get("actor_id")
1164 .and_then(Value::as_str)
1165 .ok_or_else(|| {
1166 KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1167 })?
1168 .to_string();
1169
1170 let budget = payload
1171 .get("budget")
1172 .and_then(Value::as_i64)
1173 .ok_or_else(|| {
1174 KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1175 })?;
1176
1177 if budget <= 0 {
1178 return Err(KernelError::InvalidRequest(
1179 "envelope budget must be positive".to_string(),
1180 ));
1181 }
1182
1183 let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1184 serde_json::from_value(targets_val.clone())
1185 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1186 } else {
1187 return Err(KernelError::InvalidRequest(
1188 "targets are required to create an envelope".to_string(),
1189 ));
1190 };
1191
1192 let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1193 serde_json::from_value(actions_val.clone())
1194 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1195 } else {
1196 return Err(KernelError::InvalidRequest(
1197 "actions are required to create an envelope".to_string(),
1198 ));
1199 };
1200
1201 let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1202 let report_every = payload.get("report_every").and_then(Value::as_i64);
1203 let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1204
1205 let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1207 serde_json::from_value(hold_on_val.clone())
1208 .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1209 } else {
1210 vec![]
1211 };
1212
1213 let spec = EnvelopeSpec {
1214 actor_id,
1215 grantor_id: action.actor_id.clone(),
1216 budget,
1217 targets,
1218 actions,
1219 duration_secs,
1220 report_every,
1221 hold_on,
1222 hold_timeout_secs,
1223 };
1224
1225 let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1227 .get_active_for_actor(&action.actor_id)
1228 .await?
1229 {
1230 consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1231 Some(grantor_envelope.envelope_id)
1232 } else {
1233 None
1234 };
1235
1236 let envelope_id = Uuid::new_v4().to_string();
1237
1238 Ok(Some((envelope_id, spec, parent_envelope_id)))
1239}
1240
1241fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1249 let rest = target.strip_prefix("ledger/hold/")?;
1251 if rest.is_empty() || rest.contains('/') {
1253 return None;
1254 }
1255 let hold_id = rest.to_string();
1256
1257 let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1258
1259 if decision != "approve" && decision != "reject" {
1261 return None;
1262 }
1263
1264 let instruction = payload
1265 .get("instruction")
1266 .and_then(Value::as_str)
1267 .map(|s| s.to_string());
1268
1269 Some((hold_id, decision, instruction))
1270}
1271
1272impl Kernel {
1273 async fn execute_hold_response(
1283 &self,
1284 human_action: &Action,
1285 hold_id: &str,
1286 decision: &str,
1287 instruction: Option<&str>,
1288 ) -> KernelResult<SubmitReceipt> {
1289 let caller = self
1291 .actor_store
1292 .get(&human_action.actor_id)
1293 .await?
1294 .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1295
1296 if caller.actor_type != punkgo_core::actor::ActorType::Human {
1297 return Err(KernelError::PolicyViolation(format!(
1298 "only Human actors can resolve holds; caller {} is {:?}",
1299 human_action.actor_id, caller.actor_type
1300 )));
1301 }
1302
1303 let hold_record = self
1305 .envelope_store
1306 .get_hold_request(hold_id)
1307 .await?
1308 .ok_or_else(|| {
1309 KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1310 })?;
1311
1312 let _envelope_id = hold_record
1313 .get("envelope_id")
1314 .and_then(Value::as_str)
1315 .ok_or_else(|| {
1316 KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1317 })?
1318 .to_string();
1319
1320 let agent_id = hold_record
1321 .get("agent_id")
1322 .and_then(Value::as_str)
1323 .ok_or_else(|| {
1324 KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1325 })?
1326 .to_string();
1327
1328 let pending_payload = hold_record
1329 .get("pending_payload")
1330 .cloned()
1331 .unwrap_or(Value::Null);
1332
1333 let trigger_target = hold_record
1334 .get("trigger_target")
1335 .and_then(Value::as_str)
1336 .unwrap_or("")
1337 .to_string();
1338
1339 let trigger_action = hold_record
1340 .get("trigger_action")
1341 .and_then(Value::as_str)
1342 .unwrap_or("")
1343 .to_string();
1344
1345 let status = hold_record
1346 .get("status")
1347 .and_then(Value::as_str)
1348 .unwrap_or("");
1349 if status != "pending" {
1350 return Err(KernelError::PolicyViolation(format!(
1351 "hold_request {hold_id} is already resolved (status={status})"
1352 )));
1353 }
1354
1355 let envelope_id_str = hold_record
1358 .get("envelope_id")
1359 .and_then(Value::as_str)
1360 .unwrap_or("")
1361 .to_string();
1362 if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1363 if let Some(timeout_secs) = env.hold_timeout_secs {
1364 if timeout_secs > 0 {
1365 let triggered_at: u64 = hold_record
1366 .get("triggered_at")
1367 .and_then(|v| v.as_str())
1368 .and_then(|s| s.parse().ok())
1369 .unwrap_or(0);
1370 let now = now_millis_u64();
1371 if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1372 self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1374 .await?;
1375 return Err(KernelError::PolicyViolation(format!(
1376 "hold_request {hold_id} has timed out and was auto-rejected"
1377 )));
1378 }
1379 }
1380 }
1381 }
1382
1383 let response_payload = json!({
1385 "hold_id": hold_id,
1386 "agent_id": &agent_id,
1387 "decision": decision,
1388 "instruction": instruction,
1389 "trigger": {
1390 "target": &trigger_target,
1391 "action_type": &trigger_action
1392 },
1393 "resolved_by": &human_action.actor_id,
1394 "resolved_at": now_millis_string()
1395 });
1396
1397 let response_action = Action {
1398 actor_id: human_action.actor_id.clone(),
1399 action_type: ActionType::Mutate,
1400 target: format!("ledger/hold/{hold_id}"),
1401 payload: response_payload.clone(),
1402 timestamp: None,
1403 };
1404
1405 let pool = self.state_store.pool();
1408 let mut tx = pool.begin().await?;
1409
1410 self.envelope_store
1411 .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1412 .await?;
1413
1414 let hold_reserved_cost = pending_payload
1417 .get("reserved_cost")
1418 .and_then(|v| v.as_i64())
1419 .unwrap_or(0);
1420
1421 let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1422 let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1423 self.energy_ledger
1424 .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1425 .await?;
1426 cost
1427 } else {
1428 0
1429 };
1430
1431 let mut response_event = EventRecord {
1432 id: Uuid::new_v4().to_string(),
1433 log_index: 0,
1434 event_hash: String::new(),
1435 actor_id: human_action.actor_id.clone(),
1436 action_type: "hold_response".to_string(),
1437 target: format!("ledger/hold/{hold_id}"),
1438 payload: response_payload.clone(),
1439 payload_hash: payload_hash_hex(&response_action)?,
1440 artifact_hash: None,
1441 reserved_energy: hold_reserved_cost,
1442 settled_energy: commitment_cost,
1443 timestamp: now_millis_string(),
1444 };
1445 self.event_log
1446 .append_in_tx(&mut tx, &mut response_event)
1447 .await?;
1448
1449 let log_index = response_event.log_index as u64;
1451 self.audit_log
1452 .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1453 .await
1454 .map_err(|e| KernelError::Audit(e.to_string()))?;
1455 tx.commit().await?;
1457
1458 info!(
1459 hold_id = %hold_id,
1460 decision = %decision,
1461 agent_id = %agent_id,
1462 resolved_by = %human_action.actor_id,
1463 "PIP-001 §11d: hold resolved"
1464 );
1465
1466 if decision == "approve" {
1468 let orig_target = pending_payload
1470 .get("target")
1471 .and_then(Value::as_str)
1472 .unwrap_or(&trigger_target)
1473 .to_string();
1474 let orig_action_type_str = pending_payload
1475 .get("action_type")
1476 .and_then(Value::as_str)
1477 .unwrap_or(&trigger_action)
1478 .to_string();
1479 let mut orig_payload = pending_payload
1480 .get("payload")
1481 .cloned()
1482 .unwrap_or(Value::Null);
1483
1484 if let Some(obj) = orig_payload.as_object_mut() {
1491 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1492 obj.insert(
1493 "_hold_reserved_cost".to_string(),
1494 Value::Number(hold_reserved_cost.into()),
1495 );
1496 if let Some(instr) = instruction {
1498 obj.insert(
1499 "_hold_instruction".to_string(),
1500 Value::String(instr.to_string()),
1501 );
1502 }
1503 } else {
1504 let mut obj = serde_json::Map::new();
1506 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1507 obj.insert(
1508 "_hold_reserved_cost".to_string(),
1509 Value::Number(hold_reserved_cost.into()),
1510 );
1511 if let Some(instr) = instruction {
1512 obj.insert(
1513 "_hold_instruction".to_string(),
1514 Value::String(instr.to_string()),
1515 );
1516 }
1517 orig_payload = Value::Object(obj);
1518 }
1519
1520 let orig_action_type = match orig_action_type_str.as_str() {
1521 "observe" => ActionType::Observe,
1522 "create" => ActionType::Create,
1523 "mutate" => ActionType::Mutate,
1524 _ => ActionType::Execute,
1525 };
1526
1527 let pending_action = Action {
1528 actor_id: agent_id.clone(),
1529 action_type: orig_action_type,
1530 target: orig_target,
1531 payload: orig_payload,
1532 timestamp: None,
1533 };
1534
1535 info!(
1536 hold_id = %hold_id,
1537 agent_id = %agent_id,
1538 "PIP-001 §11d: re-executing approved pending action"
1539 );
1540
1541 return Box::pin(self.submit_action(pending_action)).await;
1545 }
1546
1547 Ok(SubmitReceipt {
1549 event_id: response_event.id,
1550 log_index: response_event.log_index,
1551 event_hash: response_event.event_hash,
1552 reserved_cost: hold_reserved_cost,
1553 settled_cost: commitment_cost,
1554 artifact_hash: None,
1555 })
1556 }
1557
1558 async fn expire_timed_out_holds(
1565 &self,
1566 envelope_id: &str,
1567 hold_timeout_secs: i64,
1568 ) -> KernelResult<()> {
1569 let holds = self
1570 .envelope_store
1571 .list_pending_holds_for_envelope(envelope_id)
1572 .await?;
1573 let now = now_millis_u64();
1574
1575 for hold in holds {
1576 let triggered_at: u64 = hold
1577 .get("triggered_at")
1578 .and_then(|v| v.as_str())
1579 .and_then(|s| s.parse().ok())
1580 .unwrap_or(0);
1581
1582 if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1583 continue;
1584 }
1585
1586 let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1588 let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1589 let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1590 let reserved_cost = pending_payload
1591 .get("reserved_cost")
1592 .and_then(|v| v.as_i64())
1593 .unwrap_or(0);
1594
1595 let commitment_cost = if reserved_cost > 0 {
1596 ((reserved_cost as f64) * 0.2).ceil() as i64
1597 } else {
1598 0
1599 };
1600
1601 let timeout_payload = json!({
1602 "hold_id": hold_id,
1603 "agent_id": agent_id,
1604 "decision": "timed_out",
1605 "reserved_cost": reserved_cost,
1606 "commitment_cost": commitment_cost,
1607 "triggered_at": triggered_at.to_string(),
1608 "expired_at": now.to_string()
1609 });
1610 let timeout_action = Action {
1611 actor_id: agent_id.to_string(),
1612 action_type: ActionType::Mutate,
1613 target: format!("ledger/hold/{hold_id}"),
1614 payload: timeout_payload.clone(),
1615 timestamp: None,
1616 };
1617 let mut timeout_event = EventRecord {
1618 id: Uuid::new_v4().to_string(),
1619 log_index: 0,
1620 event_hash: String::new(),
1621 actor_id: agent_id.to_string(),
1622 action_type: "hold_timeout".to_string(),
1623 target: format!("ledger/hold/{hold_id}"),
1624 payload: timeout_payload,
1625 payload_hash: payload_hash_hex(&timeout_action)?,
1626 artifact_hash: None,
1627 reserved_energy: reserved_cost,
1628 settled_energy: commitment_cost,
1629 timestamp: now_millis_string(),
1630 };
1631
1632 let pool = self.state_store.pool();
1633 let mut tx = pool.begin().await?;
1634 self.envelope_store
1635 .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1636 .await?;
1637 if reserved_cost > 0 {
1638 self.energy_ledger
1639 .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1640 .await?;
1641 }
1642 self.event_log
1643 .append_in_tx(&mut tx, &mut timeout_event)
1644 .await?;
1645
1646 let t_log_index = timeout_event.log_index as u64;
1648 self.audit_log
1649 .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1650 .await
1651 .map_err(|e| KernelError::Audit(e.to_string()))?;
1652 tx.commit().await?;
1654
1655 info!(
1656 hold_id = %hold_id,
1657 agent_id = %agent_id,
1658 commitment_cost = commitment_cost,
1659 "PIP-001 §11e: hold auto-expired (timeout)"
1660 );
1661 }
1662 Ok(())
1663 }
1664}