1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11 ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20 ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21 NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26#[derive(Debug, Clone)]
28pub struct KernelConfig {
29 pub state_dir: PathBuf,
30 pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34 fn default() -> Self {
35 let state_dir = std::env::var("PUNKGO_STATE_DIR")
36 .map(PathBuf::from)
37 .unwrap_or_else(|_| default_state_dir());
38 Self {
39 state_dir,
40 ipc_endpoint: "punkgo-kernel".to_string(),
41 }
42 }
43}
44
45fn default_state_dir() -> PathBuf {
49 if let Some(home) = home_dir() {
50 return home.join(".punkgo").join("state");
51 }
52 PathBuf::from("state")
53}
54
55fn home_dir() -> Option<PathBuf> {
56 if let Some(home) = std::env::var_os("HOME") {
58 return Some(PathBuf::from(home));
59 }
60 if let Some(profile) = std::env::var_os("USERPROFILE") {
62 return Some(PathBuf::from(profile));
63 }
64 let drive = std::env::var_os("HOMEDRIVE")?;
66 let path = std::env::var_os("HOMEPATH")?;
67 let mut p = PathBuf::from(drive);
68 p.push(path);
69 Some(p)
70}
71
72#[derive(Debug, Clone, Serialize)]
78pub struct SubmitReceipt {
79 pub event_id: String,
80 pub log_index: i64,
81 pub event_hash: String,
82 pub reserved_cost: i64,
83 pub settled_cost: i64,
84 pub artifact_hash: Option<String>,
85}
86
87#[derive(Debug, Deserialize)]
88struct ReadQuery {
89 kind: String,
90 actor_id: Option<String>,
91 #[serde(default)]
93 hold_id: Option<String>,
94 limit: Option<i64>,
95 log_index: Option<i64>,
97 tree_size: Option<i64>,
99 old_size: Option<i64>,
101 #[serde(default)]
104 before_index: Option<i64>,
105 #[serde(default)]
108 after_index: Option<i64>,
109 #[serde(default)]
113 requester_id: Option<String>,
114}
115
116pub struct Kernel {
124 state_store: StateStore,
125 energy_ledger: EnergyLedger,
126 event_log: EventLog,
127 audit_log: AuditLog,
128 actor_store: ActorStore,
129 envelope_store: EnvelopeStore,
130 stellar_config: StellarConfig,
131}
132
133impl Kernel {
134 pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
138 let state_store = StateStore::bootstrap(&config.state_dir).await?;
139 let energy_ledger = EnergyLedger::new(state_store.pool());
140 let event_log = EventLog::new(state_store.pool());
141 let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
142 let actor_store = ActorStore::new(state_store.pool());
143 let envelope_store = EnvelopeStore::new(state_store.pool());
144
145 let stellar_config_path = config.state_dir.join("stellar.toml");
147 let stellar_config = load_stellar_config(&stellar_config_path)?;
148 info!(
149 energy_per_tick = stellar_config.effective_energy_per_tick(),
150 int8_tops = stellar_config.int8_tops,
151 tick_interval_ms = stellar_config.tick_interval_ms,
152 "stellar configuration loaded"
153 );
154
155 Ok(Self {
156 state_store,
157 energy_ledger,
158 event_log,
159 audit_log,
160 actor_store,
161 envelope_store,
162 stellar_config,
163 })
164 }
165
166 pub fn stellar_config(&self) -> &StellarConfig {
169 &self.stellar_config
170 }
171
172 pub fn energy_ledger(&self) -> &EnergyLedger {
174 &self.energy_ledger
175 }
176
177 pub fn actor_store(&self) -> &ActorStore {
179 &self.actor_store
180 }
181
182 pub fn envelope_store(&self) -> &EnvelopeStore {
184 &self.envelope_store
185 }
186
187 pub fn pool(&self) -> sqlx::SqlitePool {
189 self.state_store.pool()
190 }
191
192 pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
194 let request_id = req.request_id.clone();
195 info!(
196 request_id = %request_id,
197 request_type = ?req.request_type,
198 "received request"
199 );
200 match self.dispatch(req).await {
201 Ok(payload) => {
202 info!(request_id = %request_id, "request completed");
203 ResponseEnvelope::ok(request_id, payload)
204 }
205 Err(err) => {
206 warn!(error = %err, "request failed");
207 ResponseEnvelope::err_structured(request_id, &err)
208 }
209 }
210 }
211
212 async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
213 match req.request_type {
214 RequestType::Quote => {
215 let action: Action = serde_json::from_value(req.payload)?;
216 validate_action(&action)?;
217 let cost = quote_cost(&action);
218 Ok(json!({ "cost": cost }))
219 }
220 RequestType::Submit => {
221 let action: Action = serde_json::from_value(req.payload)?;
222 let receipt = self.submit_action(action).await?;
223 Ok(serde_json::to_value(receipt)?)
224 }
225 RequestType::Read => {
226 let query: ReadQuery = serde_json::from_value(req.payload)?;
227 self.read_query(query).await
228 }
229 }
230 }
231
232 async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
233 validate_action(&action)?;
235 if !self.state_store.actor_exists(&action.actor_id).await? {
236 return Err(KernelError::ActorNotFound(action.actor_id.clone()));
237 }
238
239 if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
243 if actor.status == punkgo_core::actor::ActorStatus::Frozen
244 && action.action_type.is_state_changing()
245 {
246 return Err(KernelError::ActorFrozen(format!(
247 "actor {} is frozen and cannot perform state-changing actions",
248 action.actor_id
249 )));
250 }
251 check_writable_boundary(&actor, &action.target, &action.action_type)?;
253
254 if actor.actor_type == punkgo_core::actor::ActorType::Agent
257 && action.action_type.is_state_changing()
258 {
259 lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
260 }
261
262 if action.action_type.is_state_changing() {
265 let envelope = self
266 .envelope_store
267 .get_active_for_actor(&action.actor_id)
268 .await?;
269
270 let envelope = if let Some(env) = envelope {
272 if consent::is_envelope_expired(&env, now_millis_u64()) {
273 self.envelope_store
274 .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
275 .await?;
276 None
277 } else {
278 Some(env)
279 }
280 } else {
281 None
282 };
283
284 let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
285
286 if auth_mode == AuthorizationMode::ManOnTheLoop
288 && let Some(ref env) = envelope
289 {
290 consent::check_envelope_covers(
291 env,
292 &action.target,
293 action.action_type.as_str(),
294 )?;
295
296 if !env.hold_on.is_empty() {
298 if let Some(timeout_secs) = env.hold_timeout_secs {
299 if timeout_secs > 0 {
300 self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
301 .await?;
302 }
303 }
304 }
305
306 let hold_approved = action
311 .payload
312 .get("_hold_approved")
313 .and_then(|v| v.as_bool())
314 .unwrap_or(false);
315 if !hold_approved
316 && consent::check_hold_trigger(
317 &env.hold_on,
318 &action.target,
319 action.action_type.as_str(),
320 )
321 {
322 let hold_id = Uuid::new_v4().to_string();
323
324 let reserved_cost = quote_cost(&action) as i64;
327
328 let hold_payload = json!({
329 "hold_id": &hold_id,
330 "agent_id": &action.actor_id,
331 "trigger": {
332 "target": &action.target,
333 "action_type": action.action_type.as_str()
334 },
335 "pending_action": {
336 "target": &action.target,
337 "action_type": action.action_type.as_str(),
338 "payload": &action.payload
339 },
340 "reserved_cost": reserved_cost,
341 "triggered_at": now_millis_string()
342 });
343
344 let hold_action = Action {
346 actor_id: action.actor_id.clone(),
347 action_type: ActionType::Mutate,
348 target: format!("ledger/hold/{hold_id}"),
349 payload: hold_payload.clone(),
350 timestamp: None,
351 };
352 let mut hold_event = EventRecord {
353 id: Uuid::new_v4().to_string(),
354 log_index: 0,
355 event_hash: String::new(),
356 actor_id: action.actor_id.clone(),
357 action_type: "hold_request".to_string(),
358 target: format!("ledger/hold/{hold_id}"),
359 payload: hold_payload.clone(),
360 payload_hash: payload_hash_hex(&hold_action)?,
361 artifact_hash: None,
362 reserved_energy: reserved_cost,
363 settled_energy: 0,
364 timestamp: now_millis_string(),
365 };
366
367 let pool = self.state_store.pool();
369 let mut tx = pool.begin().await?;
370 if reserved_cost > 0 {
371 self.energy_ledger
372 .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
373 .await?;
374 }
375 self.event_log
376 .append_in_tx(&mut tx, &mut hold_event)
377 .await?;
378 self.envelope_store
379 .create_hold_request_in_tx(
380 &mut tx,
381 &NewHoldRequest {
382 hold_id: &hold_id,
383 envelope_id: &env.envelope_id,
384 agent_id: &action.actor_id,
385 trigger_target: &action.target,
386 trigger_action: action.action_type.as_str(),
387 pending_payload: &json!({
388 "target": &action.target,
389 "action_type": action.action_type.as_str(),
390 "payload": &action.payload,
391 "reserved_cost": reserved_cost
392 }),
393 },
394 )
395 .await?;
396 let log_index = hold_event.log_index as u64;
400 let tree_size = log_index + 1;
401 self.audit_log
402 .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
403 .await
404 .map_err(|e| KernelError::Audit(e.to_string()))?;
405 self.audit_log
406 .make_checkpoint_in_tx(&mut tx, tree_size)
407 .await
408 .map_err(|e| KernelError::Audit(e.to_string()))?;
409
410 tx.commit().await?;
411
412 return Err(KernelError::HoldTriggered {
413 hold_id,
414 agent_id: action.actor_id.clone(),
415 });
416 }
417 }
418 }
419 }
420
421 let hold_response = if matches!(action.action_type, ActionType::Mutate) {
424 parse_hold_response(&action.target, &action.payload)
425 } else {
426 None
427 };
428
429 if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
431 return self
432 .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
433 .await;
434 }
435
436 let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
438 lifecycle::parse_lifecycle_op(&action.target, &action.payload)
439 } else {
440 None
441 };
442
443 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
445 let initiator = self
446 .actor_store
447 .get(&action.actor_id)
448 .await?
449 .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
450 let target_actor = self
451 .actor_store
452 .get(target_actor_id)
453 .await?
454 .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
455 lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
456 }
457
458 let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
459 let create_envelope_spec =
460 parse_create_envelope_spec(&action, &self.envelope_store).await?;
461 let policy_version = parse_policy_version(&action);
462
463 let (reserved_cost, reservation) = if let Some(hold_reserved) = action
469 .payload
470 .get("_hold_reserved_cost")
471 .and_then(|v| v.as_i64())
472 {
473 let phantom = if hold_reserved > 0 {
474 Some(EnergyReservation {
475 actor_id: action.actor_id.clone(),
476 reserved: hold_reserved,
477 })
478 } else {
479 None
480 };
481 (hold_reserved, phantom)
482 } else {
483 let cost = quote_cost(&action) as i64;
486 let res = if cost > 0 {
487 Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
488 } else {
489 None
490 };
491 (cost, res)
492 };
493
494 let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
498 Some(Self::validate_execute_payload(&action.payload)?)
499 } else {
500 None
501 };
502
503 let settled_cost = reserved_cost;
506
507 let mut event = EventRecord {
509 id: Uuid::new_v4().to_string(),
510 log_index: 0,
511 event_hash: String::new(),
512 actor_id: action.actor_id.clone(),
513 action_type: action.action_type.as_str().to_string(),
514 target: action.target.clone(),
515 payload: action.payload.clone(),
516 payload_hash: payload_hash_hex(&action)?,
517 artifact_hash: artifact_hash.clone(),
518 reserved_energy: reserved_cost,
519 settled_energy: settled_cost,
520 timestamp: now_millis_string(),
521 };
522 self.finalize_energy_and_event(
523 reservation.as_ref(),
524 settled_cost,
525 create_actor_spec.as_ref(),
526 create_envelope_spec.as_ref(),
527 &mut event,
528 )
529 .await?;
530
531 if let Some(ref version) = policy_version {
533 if let Err(err) = self.state_store.set_policy_version(version).await {
534 warn!(error = %err, "failed to record policy version after commit");
535 } else {
536 info!(policy_version = %version, event_id = %event.id, "policy version updated");
537 }
538 }
539
540 if action.action_type.is_state_changing()
542 && settled_cost > 0
543 && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
544 && actor.actor_type == ActorType::Agent
545 && let Ok(Some(envelope)) = self
546 .envelope_store
547 .get_active_for_actor(&action.actor_id)
548 .await
549 {
550 let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
552
553 let pool = self.state_store.pool();
555 let mut tx = pool.begin().await?;
556 match self
557 .envelope_store
558 .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
559 .await
560 {
561 Ok(_new_consumed) => {
562 match checkpoint {
564 Some(CheckpointLevel::Halt) => {
565 self.envelope_store
566 .set_status_in_tx(
567 &mut tx,
568 &envelope.envelope_id,
569 &consent::EnvelopeStatus::Expired,
570 )
571 .await?;
572 info!(
573 envelope_id = %envelope.envelope_id,
574 actor_id = %action.actor_id,
575 "checkpoint: HALT — envelope budget exhausted"
576 );
577 }
578 Some(CheckpointLevel::Report) => {
579 info!(
580 envelope_id = %envelope.envelope_id,
581 actor_id = %action.actor_id,
582 budget = envelope.budget,
583 consumed = envelope.budget_consumed + settled_cost,
584 "checkpoint: REPORT — summary logged"
585 );
586 }
587 None => {}
588 }
589 tx.commit().await?;
590 }
591 Err(err) => {
592 warn!(
593 error = %err,
594 "envelope budget consumption failed (event already committed)"
595 );
596 }
598 }
599 }
600
601 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
603 let pool = self.state_store.pool();
604 match op {
605 punkgo_core::actor::LifecycleOp::Freeze { .. } => {
606 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
607 {
608 Ok(frozen_ids) => {
609 info!(
610 target = %target_actor_id,
611 cascade_count = frozen_ids.len(),
612 "lifecycle: freeze executed"
613 );
614 }
615 Err(err) => {
616 warn!(error = %err, "lifecycle: freeze failed (event already committed)");
617 }
618 }
619 }
620 punkgo_core::actor::LifecycleOp::Unfreeze => {
621 match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
622 .await
623 {
624 Ok(()) => {
625 info!(target = %target_actor_id, "lifecycle: unfreeze executed");
626 }
627 Err(err) => {
628 warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
629 }
630 }
631 }
632 punkgo_core::actor::LifecycleOp::Terminate { .. } => {
633 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
636 {
637 Ok(frozen_ids) => {
638 info!(
639 target = %target_actor_id,
640 cascade_count = frozen_ids.len(),
641 "lifecycle: terminate executed (cascade frozen)"
642 );
643 }
644 Err(err) => {
645 warn!(error = %err, "lifecycle: terminate failed (event already committed)");
646 }
647 }
648 }
649 }
650 }
651
652 Ok(SubmitReceipt {
653 event_id: event.id,
654 log_index: event.log_index,
655 event_hash: event.event_hash,
656 reserved_cost,
657 settled_cost,
658 artifact_hash,
659 })
660 }
661
662 async fn finalize_energy_and_event(
663 &self,
664 reservation: Option<&EnergyReservation>,
665 settled_cost: i64,
666 create_actor: Option<&CreateActorSpec>,
667 create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
668 event: &mut EventRecord,
669 ) -> KernelResult<()> {
670 let pool = self.state_store.pool();
671 let mut tx = pool.begin().await?;
672
673 if let Some(res) = reservation {
675 self.energy_ledger
676 .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
677 .await?;
678 }
679
680 if let Some(spec) = create_actor {
682 self.energy_ledger
683 .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
684 .await?;
685 self.actor_store.create_in_tx(&mut tx, spec).await?;
686 info!(
687 created_actor = %spec.actor_id,
688 actor_type = spec.actor_type.as_str(),
689 energy_balance = spec.energy_balance,
690 "actor created in transaction"
691 );
692 }
693
694 if let Some((envelope_id, spec, parent_id)) = create_envelope {
696 self.envelope_store
697 .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
698 .await?;
699 info!(
700 envelope_id = %envelope_id,
701 actor_id = %spec.actor_id,
702 grantor_id = %spec.grantor_id,
703 budget = spec.budget,
704 "envelope created in transaction"
705 );
706 }
707
708 self.event_log.append_in_tx(&mut tx, event).await?;
710
711 let log_index = event.log_index as u64;
713 let tree_size = log_index + 1;
714 self.audit_log
715 .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
716 .await
717 .map_err(|e| KernelError::Audit(e.to_string()))?;
718 self.audit_log
719 .make_checkpoint_in_tx(&mut tx, tree_size)
720 .await
721 .map_err(|e| KernelError::Audit(e.to_string()))?;
722
723 tx.commit().await?;
724 info!(event_id = %event.id, log_index = event.log_index, "event committed");
725 Ok(())
726 }
727
728 fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
733 Self::require_valid_oid(payload, "input_oid")?;
734 Self::require_valid_oid(payload, "output_oid")?;
735
736 if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
737 return Err(KernelError::ExecutePayloadInvalid(
738 "missing or invalid exit_code (must be integer)".to_string(),
739 ));
740 }
741
742 let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
743 Ok(artifact_hash)
744 }
745
746 fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
748 let val = payload
749 .get(field)
750 .and_then(|v| v.as_str())
751 .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
752
753 if !val.starts_with("sha256:") || val.len() != 71 {
754 return Err(KernelError::ExecutePayloadInvalid(format!(
755 "{field} must be sha256:<64 hex chars>, got: {val}"
756 )));
757 }
758 let hex_part = &val[7..];
759 if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
760 return Err(KernelError::ExecutePayloadInvalid(format!(
761 "{field} contains non-hex characters: {val}"
762 )));
763 }
764
765 Ok(val.to_string())
766 }
767
768 async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
769 let requester = query.requester_id.as_deref().unwrap_or("anonymous");
774 check_read_access(requester, &query.kind)?;
775
776 match query.kind.as_str() {
777 "health" => Ok(json!({ "status": "ok" })),
778 "actor_energy" => {
779 let actor_id = query.actor_id.ok_or_else(|| {
780 KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
781 })?;
782 let (energy_balance, reserved_energy) =
783 self.energy_ledger.balance_view(&actor_id).await?;
784 Ok(json!({
785 "actor_id": actor_id,
786 "energy_balance": energy_balance,
787 "reserved_energy": reserved_energy
788 }))
789 }
790 "events" => {
791 let limit = query.limit.unwrap_or(20).clamp(1, 500);
792 let events = self
793 .event_log
794 .query(
795 query.actor_id.as_deref(),
796 query.before_index,
797 query.after_index,
798 limit,
799 )
800 .await?;
801 let next_cursor = events.last().map(|e| e.log_index);
804 let has_more = events.len() as i64 == limit;
805 Ok(json!({
806 "events": events,
807 "has_more": has_more,
808 "next_cursor": next_cursor
809 }))
810 }
811 "stats" => {
812 let event_count = self.event_log.count().await?;
813 Ok(json!({ "event_count": event_count }))
814 }
815 "snapshot" => {
816 let event_count = self.event_log.count().await?;
819 let cp = self
820 .audit_log
821 .latest_checkpoint()
822 .await
823 .map_err(|e| KernelError::Audit(e.to_string()))?;
824 Ok(json!({
825 "event_count": event_count,
826 "snapshot_hash": cp.root_hash,
827 "generated_at": cp.created_at
828 }))
829 }
830 "paths" => {
831 let paths = self.state_store.paths();
832 Ok(json!({
833 "root": paths.root.display().to_string(),
834 "workspace_root": paths.workspace_root.display().to_string(),
835 "quarantine_root": paths.quarantine_root.display().to_string(),
836 "db_path": paths.db_path.display().to_string()
837 }))
838 }
839 "audit_checkpoint" => {
840 let cp = self
841 .audit_log
842 .latest_checkpoint()
843 .await
844 .map_err(|e| KernelError::Audit(e.to_string()))?;
845 Ok(serde_json::to_value(cp)?)
846 }
847 "audit_inclusion_proof" => {
848 let log_index = query.log_index.ok_or_else(|| {
849 KernelError::InvalidRequest(
850 "log_index is required for audit_inclusion_proof".to_string(),
851 )
852 })? as u64;
853 let tree_size = match query.tree_size {
854 Some(s) => s as u64,
855 None => self
856 .audit_log
857 .tree_size()
858 .await
859 .map_err(|e| KernelError::Audit(e.to_string()))?,
860 };
861 let proof = self
862 .audit_log
863 .inclusion_proof(log_index, tree_size)
864 .await
865 .map_err(|e| KernelError::Audit(e.to_string()))?;
866 Ok(json!({
867 "log_index": log_index,
868 "tree_size": tree_size,
869 "proof": proof
870 }))
871 }
872 "audit_consistency_proof" => {
873 let old_size = query.old_size.ok_or_else(|| {
874 KernelError::InvalidRequest(
875 "old_size is required for audit_consistency_proof".to_string(),
876 )
877 })? as u64;
878 let new_size = query.tree_size.ok_or_else(|| {
879 KernelError::InvalidRequest(
880 "tree_size is required for audit_consistency_proof".to_string(),
881 )
882 })? as u64;
883 let proof = self
884 .audit_log
885 .consistency_proof(old_size, new_size)
886 .await
887 .map_err(|e| KernelError::Audit(e.to_string()))?;
888 Ok(json!({
889 "old_size": old_size,
890 "new_size": new_size,
891 "proof": proof
892 }))
893 }
894 "stellar_info" => {
896 let config = &self.stellar_config;
897 Ok(json!({
898 "gpu_model": config.gpu_model,
899 "cpu_model": config.cpu_model,
900 "int8_tops": config.int8_tops,
901 "energy_per_tick": config.effective_energy_per_tick(),
902 "tick_interval_ms": config.tick_interval_ms,
903 "luminosity_source": config.luminosity_source
904 }))
905 }
906 "envelope_info" => {
908 let actor_id = query.actor_id.ok_or_else(|| {
909 KernelError::InvalidRequest(
910 "actor_id is required for envelope_info".to_string(),
911 )
912 })?;
913 let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
914 match envelope {
915 Some(record) => Ok(serde_json::to_value(record)?),
916 None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
917 }
918 }
919 "actor_info" => {
921 let actor_id = query.actor_id.ok_or_else(|| {
922 KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
923 })?;
924 let actor = self.actor_store.get(&actor_id).await?;
925 match actor {
926 Some(record) => Ok(serde_json::to_value(record)?),
927 None => Err(KernelError::ActorNotFound(actor_id)),
928 }
929 }
930 "hold_info" => {
932 let hold_id = query.hold_id.ok_or_else(|| {
933 KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
934 })?;
935 let hold = self.envelope_store.get_hold_request(&hold_id).await?;
936 match hold {
937 Some(record) => Ok(record),
938 None => Err(KernelError::InvalidRequest(format!(
939 "hold_request not found: {hold_id}"
940 ))),
941 }
942 }
943 "holds_pending" => {
945 let holds = self
946 .envelope_store
947 .list_pending_holds(query.actor_id.as_deref())
948 .await?;
949 Ok(json!({ "holds": holds }))
950 }
951 other => Err(KernelError::InvalidRequest(format!(
952 "unsupported read query kind: {other}"
953 ))),
954 }
955 }
956}
957
958fn now_millis_string() -> String {
959 let now = std::time::SystemTime::now()
960 .duration_since(std::time::UNIX_EPOCH)
961 .unwrap_or_default();
962 now.as_millis().to_string()
963}
964
965fn now_millis_u64() -> u64 {
966 std::time::SystemTime::now()
967 .duration_since(std::time::UNIX_EPOCH)
968 .unwrap_or_default()
969 .as_millis() as u64
970}
971
972fn parse_policy_version(action: &Action) -> Option<String> {
975 if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
976 return None;
977 }
978 action
979 .payload
980 .get("version")
981 .and_then(|v| v.as_str())
982 .map(|s| s.to_string())
983}
984
985async fn parse_create_actor_spec(
992 action: &Action,
993 actor_store: &ActorStore,
994) -> KernelResult<Option<CreateActorSpec>> {
995 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
996 return Ok(None);
997 }
998
999 let payload = action.payload.as_object().ok_or_else(|| {
1000 KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1001 })?;
1002
1003 let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1005 let purpose = payload
1006 .get("purpose")
1007 .and_then(Value::as_str)
1008 .unwrap_or("legacy");
1009 let energy_balance = payload
1010 .get("energy_balance")
1011 .and_then(Value::as_i64)
1012 .unwrap_or(1000);
1013
1014 let actor_type_str = payload
1016 .get("actor_type")
1017 .and_then(Value::as_str)
1018 .unwrap_or("agent");
1019 let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1020 KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1021 })?;
1022
1023 let actor_id = if let Some(id) = explicit_id {
1025 id.to_string()
1026 } else {
1027 if purpose == "legacy" {
1028 return Err(KernelError::InvalidRequest(
1029 "actor_id or purpose is required to create an actor".to_string(),
1030 ));
1031 }
1032 let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1033 derive_agent_id(&action.actor_id, purpose, seq)
1034 };
1035
1036 let creator_record = actor_store.get(&action.actor_id).await?;
1038 let (creator_type, creator_lineage) = match &creator_record {
1039 Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1040 None => (ActorType::Human, vec![]),
1042 };
1043
1044 if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1046 return Err(KernelError::PolicyViolation(
1047 "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1048 .to_string(),
1049 ));
1050 }
1051
1052 let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1053
1054 let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1056 payload.get("writable_targets")
1057 {
1058 serde_json::from_value(targets_val.clone())
1059 .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1060 } else {
1061 vec![]
1062 };
1063
1064 if !writable_targets.is_empty()
1066 && let Some(ref creator) = creator_record
1067 {
1068 validate_child_targets(creator, &writable_targets)?;
1069 }
1070
1071 let energy_share = payload
1073 .get("energy_share")
1074 .and_then(Value::as_f64)
1075 .unwrap_or(0.0);
1076
1077 let reduction_policy = payload
1078 .get("reduction_policy")
1079 .and_then(Value::as_str)
1080 .unwrap_or("none")
1081 .to_string();
1082
1083 Ok(Some(CreateActorSpec {
1084 actor_id,
1085 actor_type,
1086 creator_id: action.actor_id.clone(),
1087 lineage,
1088 purpose: Some(purpose.to_string()),
1089 writable_targets,
1090 energy_balance,
1091 energy_share,
1092 reduction_policy,
1093 }))
1094}
1095
1096async fn parse_create_envelope_spec(
1105 action: &Action,
1106 envelope_store: &EnvelopeStore,
1107) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1108 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1109 return Ok(None);
1110 }
1111
1112 let payload = action.payload.as_object().ok_or_else(|| {
1113 KernelError::InvalidRequest("envelope payload must be an object".to_string())
1114 })?;
1115
1116 let actor_id = payload
1117 .get("actor_id")
1118 .and_then(Value::as_str)
1119 .ok_or_else(|| {
1120 KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1121 })?
1122 .to_string();
1123
1124 let budget = payload
1125 .get("budget")
1126 .and_then(Value::as_i64)
1127 .ok_or_else(|| {
1128 KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1129 })?;
1130
1131 if budget <= 0 {
1132 return Err(KernelError::InvalidRequest(
1133 "envelope budget must be positive".to_string(),
1134 ));
1135 }
1136
1137 let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1138 serde_json::from_value(targets_val.clone())
1139 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1140 } else {
1141 return Err(KernelError::InvalidRequest(
1142 "targets are required to create an envelope".to_string(),
1143 ));
1144 };
1145
1146 let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1147 serde_json::from_value(actions_val.clone())
1148 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1149 } else {
1150 return Err(KernelError::InvalidRequest(
1151 "actions are required to create an envelope".to_string(),
1152 ));
1153 };
1154
1155 let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1156 let report_every = payload.get("report_every").and_then(Value::as_i64);
1157 let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1158
1159 let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1161 serde_json::from_value(hold_on_val.clone())
1162 .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1163 } else {
1164 vec![]
1165 };
1166
1167 let spec = EnvelopeSpec {
1168 actor_id,
1169 grantor_id: action.actor_id.clone(),
1170 budget,
1171 targets,
1172 actions,
1173 duration_secs,
1174 report_every,
1175 hold_on,
1176 hold_timeout_secs,
1177 };
1178
1179 let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1181 .get_active_for_actor(&action.actor_id)
1182 .await?
1183 {
1184 consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1185 Some(grantor_envelope.envelope_id)
1186 } else {
1187 None
1188 };
1189
1190 let envelope_id = Uuid::new_v4().to_string();
1191
1192 Ok(Some((envelope_id, spec, parent_envelope_id)))
1193}
1194
1195fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1203 let rest = target.strip_prefix("ledger/hold/")?;
1205 if rest.is_empty() || rest.contains('/') {
1207 return None;
1208 }
1209 let hold_id = rest.to_string();
1210
1211 let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1212
1213 if decision != "approve" && decision != "reject" {
1215 return None;
1216 }
1217
1218 let instruction = payload
1219 .get("instruction")
1220 .and_then(Value::as_str)
1221 .map(|s| s.to_string());
1222
1223 Some((hold_id, decision, instruction))
1224}
1225
1226impl Kernel {
1227 async fn execute_hold_response(
1237 &self,
1238 human_action: &Action,
1239 hold_id: &str,
1240 decision: &str,
1241 instruction: Option<&str>,
1242 ) -> KernelResult<SubmitReceipt> {
1243 let caller = self
1245 .actor_store
1246 .get(&human_action.actor_id)
1247 .await?
1248 .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1249
1250 if caller.actor_type != punkgo_core::actor::ActorType::Human {
1251 return Err(KernelError::PolicyViolation(format!(
1252 "only Human actors can resolve holds; caller {} is {:?}",
1253 human_action.actor_id, caller.actor_type
1254 )));
1255 }
1256
1257 let hold_record = self
1259 .envelope_store
1260 .get_hold_request(hold_id)
1261 .await?
1262 .ok_or_else(|| {
1263 KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1264 })?;
1265
1266 let _envelope_id = hold_record
1267 .get("envelope_id")
1268 .and_then(Value::as_str)
1269 .ok_or_else(|| {
1270 KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1271 })?
1272 .to_string();
1273
1274 let agent_id = hold_record
1275 .get("agent_id")
1276 .and_then(Value::as_str)
1277 .ok_or_else(|| {
1278 KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1279 })?
1280 .to_string();
1281
1282 let pending_payload = hold_record
1283 .get("pending_payload")
1284 .cloned()
1285 .unwrap_or(Value::Null);
1286
1287 let trigger_target = hold_record
1288 .get("trigger_target")
1289 .and_then(Value::as_str)
1290 .unwrap_or("")
1291 .to_string();
1292
1293 let trigger_action = hold_record
1294 .get("trigger_action")
1295 .and_then(Value::as_str)
1296 .unwrap_or("")
1297 .to_string();
1298
1299 let status = hold_record
1300 .get("status")
1301 .and_then(Value::as_str)
1302 .unwrap_or("");
1303 if status != "pending" {
1304 return Err(KernelError::PolicyViolation(format!(
1305 "hold_request {hold_id} is already resolved (status={status})"
1306 )));
1307 }
1308
1309 let envelope_id_str = hold_record
1312 .get("envelope_id")
1313 .and_then(Value::as_str)
1314 .unwrap_or("")
1315 .to_string();
1316 if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1317 if let Some(timeout_secs) = env.hold_timeout_secs {
1318 if timeout_secs > 0 {
1319 let triggered_at: u64 = hold_record
1320 .get("triggered_at")
1321 .and_then(|v| v.as_str())
1322 .and_then(|s| s.parse().ok())
1323 .unwrap_or(0);
1324 let now = now_millis_u64();
1325 if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1326 self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1328 .await?;
1329 return Err(KernelError::PolicyViolation(format!(
1330 "hold_request {hold_id} has timed out and was auto-rejected"
1331 )));
1332 }
1333 }
1334 }
1335 }
1336
1337 let response_payload = json!({
1339 "hold_id": hold_id,
1340 "agent_id": &agent_id,
1341 "decision": decision,
1342 "instruction": instruction,
1343 "trigger": {
1344 "target": &trigger_target,
1345 "action_type": &trigger_action
1346 },
1347 "resolved_by": &human_action.actor_id,
1348 "resolved_at": now_millis_string()
1349 });
1350
1351 let response_action = Action {
1352 actor_id: human_action.actor_id.clone(),
1353 action_type: ActionType::Mutate,
1354 target: format!("ledger/hold/{hold_id}"),
1355 payload: response_payload.clone(),
1356 timestamp: None,
1357 };
1358
1359 let pool = self.state_store.pool();
1362 let mut tx = pool.begin().await?;
1363
1364 self.envelope_store
1365 .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1366 .await?;
1367
1368 let hold_reserved_cost = pending_payload
1371 .get("reserved_cost")
1372 .and_then(|v| v.as_i64())
1373 .unwrap_or(0);
1374
1375 let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1376 let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1377 self.energy_ledger
1378 .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1379 .await?;
1380 cost
1381 } else {
1382 0
1383 };
1384
1385 let mut response_event = EventRecord {
1386 id: Uuid::new_v4().to_string(),
1387 log_index: 0,
1388 event_hash: String::new(),
1389 actor_id: human_action.actor_id.clone(),
1390 action_type: "hold_response".to_string(),
1391 target: format!("ledger/hold/{hold_id}"),
1392 payload: response_payload.clone(),
1393 payload_hash: payload_hash_hex(&response_action)?,
1394 artifact_hash: None,
1395 reserved_energy: hold_reserved_cost,
1396 settled_energy: commitment_cost,
1397 timestamp: now_millis_string(),
1398 };
1399 self.event_log
1400 .append_in_tx(&mut tx, &mut response_event)
1401 .await?;
1402
1403 let log_index = response_event.log_index as u64;
1405 let tree_size = log_index + 1;
1406 self.audit_log
1407 .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1408 .await
1409 .map_err(|e| KernelError::Audit(e.to_string()))?;
1410 self.audit_log
1411 .make_checkpoint_in_tx(&mut tx, tree_size)
1412 .await
1413 .map_err(|e| KernelError::Audit(e.to_string()))?;
1414
1415 tx.commit().await?;
1416
1417 info!(
1418 hold_id = %hold_id,
1419 decision = %decision,
1420 agent_id = %agent_id,
1421 resolved_by = %human_action.actor_id,
1422 "PIP-001 §11d: hold resolved"
1423 );
1424
1425 if decision == "approve" {
1427 let orig_target = pending_payload
1429 .get("target")
1430 .and_then(Value::as_str)
1431 .unwrap_or(&trigger_target)
1432 .to_string();
1433 let orig_action_type_str = pending_payload
1434 .get("action_type")
1435 .and_then(Value::as_str)
1436 .unwrap_or(&trigger_action)
1437 .to_string();
1438 let mut orig_payload = pending_payload
1439 .get("payload")
1440 .cloned()
1441 .unwrap_or(Value::Null);
1442
1443 if let Some(obj) = orig_payload.as_object_mut() {
1450 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1451 obj.insert(
1452 "_hold_reserved_cost".to_string(),
1453 Value::Number(hold_reserved_cost.into()),
1454 );
1455 if let Some(instr) = instruction {
1457 obj.insert(
1458 "_hold_instruction".to_string(),
1459 Value::String(instr.to_string()),
1460 );
1461 }
1462 } else {
1463 let mut obj = serde_json::Map::new();
1465 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1466 obj.insert(
1467 "_hold_reserved_cost".to_string(),
1468 Value::Number(hold_reserved_cost.into()),
1469 );
1470 if let Some(instr) = instruction {
1471 obj.insert(
1472 "_hold_instruction".to_string(),
1473 Value::String(instr.to_string()),
1474 );
1475 }
1476 orig_payload = Value::Object(obj);
1477 }
1478
1479 let orig_action_type = match orig_action_type_str.as_str() {
1480 "observe" => ActionType::Observe,
1481 "create" => ActionType::Create,
1482 "mutate" => ActionType::Mutate,
1483 _ => ActionType::Execute,
1484 };
1485
1486 let pending_action = Action {
1487 actor_id: agent_id.clone(),
1488 action_type: orig_action_type,
1489 target: orig_target,
1490 payload: orig_payload,
1491 timestamp: None,
1492 };
1493
1494 info!(
1495 hold_id = %hold_id,
1496 agent_id = %agent_id,
1497 "PIP-001 §11d: re-executing approved pending action"
1498 );
1499
1500 return Box::pin(self.submit_action(pending_action)).await;
1504 }
1505
1506 Ok(SubmitReceipt {
1508 event_id: response_event.id,
1509 log_index: response_event.log_index,
1510 event_hash: response_event.event_hash,
1511 reserved_cost: hold_reserved_cost,
1512 settled_cost: commitment_cost,
1513 artifact_hash: None,
1514 })
1515 }
1516
1517 async fn expire_timed_out_holds(
1524 &self,
1525 envelope_id: &str,
1526 hold_timeout_secs: i64,
1527 ) -> KernelResult<()> {
1528 let holds = self
1529 .envelope_store
1530 .list_pending_holds_for_envelope(envelope_id)
1531 .await?;
1532 let now = now_millis_u64();
1533
1534 for hold in holds {
1535 let triggered_at: u64 = hold
1536 .get("triggered_at")
1537 .and_then(|v| v.as_str())
1538 .and_then(|s| s.parse().ok())
1539 .unwrap_or(0);
1540
1541 if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1542 continue;
1543 }
1544
1545 let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1547 let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1548 let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1549 let reserved_cost = pending_payload
1550 .get("reserved_cost")
1551 .and_then(|v| v.as_i64())
1552 .unwrap_or(0);
1553
1554 let commitment_cost = if reserved_cost > 0 {
1555 ((reserved_cost as f64) * 0.2).ceil() as i64
1556 } else {
1557 0
1558 };
1559
1560 let timeout_payload = json!({
1561 "hold_id": hold_id,
1562 "agent_id": agent_id,
1563 "decision": "timed_out",
1564 "reserved_cost": reserved_cost,
1565 "commitment_cost": commitment_cost,
1566 "triggered_at": triggered_at.to_string(),
1567 "expired_at": now.to_string()
1568 });
1569 let timeout_action = Action {
1570 actor_id: agent_id.to_string(),
1571 action_type: ActionType::Mutate,
1572 target: format!("ledger/hold/{hold_id}"),
1573 payload: timeout_payload.clone(),
1574 timestamp: None,
1575 };
1576 let mut timeout_event = EventRecord {
1577 id: Uuid::new_v4().to_string(),
1578 log_index: 0,
1579 event_hash: String::new(),
1580 actor_id: agent_id.to_string(),
1581 action_type: "hold_timeout".to_string(),
1582 target: format!("ledger/hold/{hold_id}"),
1583 payload: timeout_payload,
1584 payload_hash: payload_hash_hex(&timeout_action)?,
1585 artifact_hash: None,
1586 reserved_energy: reserved_cost,
1587 settled_energy: commitment_cost,
1588 timestamp: now_millis_string(),
1589 };
1590
1591 let pool = self.state_store.pool();
1592 let mut tx = pool.begin().await?;
1593 self.envelope_store
1594 .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1595 .await?;
1596 if reserved_cost > 0 {
1597 self.energy_ledger
1598 .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1599 .await?;
1600 }
1601 self.event_log
1602 .append_in_tx(&mut tx, &mut timeout_event)
1603 .await?;
1604
1605 let t_log_index = timeout_event.log_index as u64;
1607 let t_tree_size = t_log_index + 1;
1608 self.audit_log
1609 .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1610 .await
1611 .map_err(|e| KernelError::Audit(e.to_string()))?;
1612 self.audit_log
1613 .make_checkpoint_in_tx(&mut tx, t_tree_size)
1614 .await
1615 .map_err(|e| KernelError::Audit(e.to_string()))?;
1616
1617 tx.commit().await?;
1618
1619 info!(
1620 hold_id = %hold_id,
1621 agent_id = %agent_id,
1622 commitment_cost = commitment_cost,
1623 "PIP-001 §11e: hold auto-expired (timeout)"
1624 );
1625 }
1626 Ok(())
1627 }
1628}