1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11 ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20 ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21 NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26#[derive(Debug, Clone)]
28pub struct KernelConfig {
29 pub state_dir: PathBuf,
30 pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34 fn default() -> Self {
35 let state_dir = std::env::var("PUNKGO_STATE_DIR")
36 .map(PathBuf::from)
37 .unwrap_or_else(|_| default_state_dir());
38 Self {
39 state_dir,
40 ipc_endpoint: "punkgo-kernel".to_string(),
41 }
42 }
43}
44
45fn default_state_dir() -> PathBuf {
49 if let Some(home) = home_dir() {
50 return home.join(".punkgo").join("state");
51 }
52 PathBuf::from("state")
53}
54
55fn home_dir() -> Option<PathBuf> {
56 if let Some(home) = std::env::var_os("HOME") {
58 return Some(PathBuf::from(home));
59 }
60 if let Some(profile) = std::env::var_os("USERPROFILE") {
62 return Some(PathBuf::from(profile));
63 }
64 let drive = std::env::var_os("HOMEDRIVE")?;
66 let path = std::env::var_os("HOMEPATH")?;
67 let mut p = PathBuf::from(drive);
68 p.push(path);
69 Some(p)
70}
71
72#[derive(Debug, Clone, Serialize)]
78pub struct SubmitReceipt {
79 pub event_id: String,
80 pub log_index: i64,
81 pub event_hash: String,
82 pub reserved_cost: i64,
83 pub settled_cost: i64,
84 pub artifact_hash: Option<String>,
85}
86
87#[derive(Debug, Deserialize)]
88struct ReadQuery {
89 kind: String,
90 actor_id: Option<String>,
91 #[serde(default)]
93 hold_id: Option<String>,
94 limit: Option<i64>,
95 log_index: Option<i64>,
97 tree_size: Option<i64>,
99 old_size: Option<i64>,
101 #[serde(default)]
104 before_index: Option<i64>,
105 #[serde(default)]
108 after_index: Option<i64>,
109 #[serde(default)]
113 requester_id: Option<String>,
114}
115
116pub struct Kernel {
124 state_store: StateStore,
125 energy_ledger: EnergyLedger,
126 event_log: EventLog,
127 audit_log: AuditLog,
128 actor_store: ActorStore,
129 envelope_store: EnvelopeStore,
130 stellar_config: StellarConfig,
131}
132
133impl Kernel {
134 pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
138 let state_store = StateStore::bootstrap(&config.state_dir).await?;
139 let energy_ledger = EnergyLedger::new(state_store.pool());
140 let event_log = EventLog::new(state_store.pool());
141 let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
142 let actor_store = ActorStore::new(state_store.pool());
143 let envelope_store = EnvelopeStore::new(state_store.pool());
144
145 let stellar_config_path = config.state_dir.join("stellar.toml");
147 let stellar_config = load_stellar_config(&stellar_config_path)?;
148 info!(
149 energy_per_tick = stellar_config.effective_energy_per_tick(),
150 int8_tops = stellar_config.int8_tops,
151 tick_interval_ms = stellar_config.tick_interval_ms,
152 "stellar configuration loaded"
153 );
154
155 Ok(Self {
156 state_store,
157 energy_ledger,
158 event_log,
159 audit_log,
160 actor_store,
161 envelope_store,
162 stellar_config,
163 })
164 }
165
166 pub fn stellar_config(&self) -> &StellarConfig {
169 &self.stellar_config
170 }
171
172 pub fn energy_ledger(&self) -> &EnergyLedger {
174 &self.energy_ledger
175 }
176
177 pub fn actor_store(&self) -> &ActorStore {
179 &self.actor_store
180 }
181
182 pub fn envelope_store(&self) -> &EnvelopeStore {
184 &self.envelope_store
185 }
186
187 pub fn pool(&self) -> sqlx::SqlitePool {
189 self.state_store.pool()
190 }
191
192 pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
194 let request_id = req.request_id.clone();
195 info!(
196 request_id = %request_id,
197 request_type = ?req.request_type,
198 "received request"
199 );
200 match self.dispatch(req).await {
201 Ok(payload) => {
202 info!(request_id = %request_id, "request completed");
203 ResponseEnvelope::ok(request_id, payload)
204 }
205 Err(err) => {
206 warn!(error = %err, "request failed");
207 ResponseEnvelope::err_structured(request_id, &err)
208 }
209 }
210 }
211
212 async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
213 match req.request_type {
214 RequestType::Quote => {
215 let action: Action = serde_json::from_value(req.payload)?;
216 validate_action(&action)?;
217 let cost = quote_cost(&action);
218 Ok(json!({ "cost": cost }))
219 }
220 RequestType::Submit => {
221 let action: Action = serde_json::from_value(req.payload)?;
222 let receipt = self.submit_action(action).await?;
223 Ok(serde_json::to_value(receipt)?)
224 }
225 RequestType::Read => {
226 let query: ReadQuery = serde_json::from_value(req.payload)?;
227 self.read_query(query).await
228 }
229 }
230 }
231
232 async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
233 validate_action(&action)?;
235 if !self.state_store.actor_exists(&action.actor_id).await? {
236 return Err(KernelError::ActorNotFound(action.actor_id.clone()));
237 }
238
239 if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
243 if actor.status == punkgo_core::actor::ActorStatus::Frozen
244 && action.action_type.is_state_changing()
245 {
246 return Err(KernelError::ActorFrozen(format!(
247 "actor {} is frozen and cannot perform state-changing actions",
248 action.actor_id
249 )));
250 }
251 check_writable_boundary(&actor, &action.target, &action.action_type)?;
253
254 if actor.actor_type == punkgo_core::actor::ActorType::Agent
257 && action.action_type.is_state_changing()
258 {
259 lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
260 }
261
262 if action.action_type.is_state_changing() {
265 let envelope = self
266 .envelope_store
267 .get_active_for_actor(&action.actor_id)
268 .await?;
269
270 let envelope = if let Some(env) = envelope {
272 if consent::is_envelope_expired(&env, now_millis_u64()) {
273 self.envelope_store
274 .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
275 .await?;
276 None
277 } else {
278 Some(env)
279 }
280 } else {
281 None
282 };
283
284 let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
285
286 if auth_mode == AuthorizationMode::ManOnTheLoop
288 && let Some(ref env) = envelope
289 {
290 consent::check_envelope_covers(
291 env,
292 &action.target,
293 action.action_type.as_str(),
294 )?;
295
296 if !env.hold_on.is_empty() {
298 if let Some(timeout_secs) = env.hold_timeout_secs {
299 if timeout_secs > 0 {
300 self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
301 .await?;
302 }
303 }
304 }
305
306 let hold_approved = action
311 .payload
312 .get("_hold_approved")
313 .and_then(|v| v.as_bool())
314 .unwrap_or(false);
315 if !hold_approved
316 && consent::check_hold_trigger(
317 &env.hold_on,
318 &action.target,
319 action.action_type.as_str(),
320 )
321 {
322 let hold_id = Uuid::new_v4().to_string();
323
324 let reserved_cost = quote_cost(&action) as i64;
327
328 let hold_payload = json!({
329 "hold_id": &hold_id,
330 "agent_id": &action.actor_id,
331 "trigger": {
332 "target": &action.target,
333 "action_type": action.action_type.as_str()
334 },
335 "pending_action": {
336 "target": &action.target,
337 "action_type": action.action_type.as_str(),
338 "payload": &action.payload
339 },
340 "reserved_cost": reserved_cost,
341 "triggered_at": now_millis_string()
342 });
343
344 let hold_action = Action {
346 actor_id: action.actor_id.clone(),
347 action_type: ActionType::Mutate,
348 target: format!("ledger/hold/{hold_id}"),
349 payload: hold_payload.clone(),
350 timestamp: None,
351 };
352 let mut hold_event = EventRecord {
353 id: Uuid::new_v4().to_string(),
354 log_index: 0,
355 event_hash: String::new(),
356 actor_id: action.actor_id.clone(),
357 action_type: "hold_request".to_string(),
358 target: format!("ledger/hold/{hold_id}"),
359 payload: hold_payload.clone(),
360 payload_hash: payload_hash_hex(&hold_action)?,
361 artifact_hash: None,
362 reserved_energy: reserved_cost,
363 settled_energy: 0,
364 timestamp: now_millis_string(),
365 };
366
367 let pool = self.state_store.pool();
369 let mut tx = pool.begin().await?;
370 if reserved_cost > 0 {
371 self.energy_ledger
372 .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
373 .await?;
374 }
375 self.event_log
376 .append_in_tx(&mut tx, &mut hold_event)
377 .await?;
378 self.envelope_store
379 .create_hold_request_in_tx(
380 &mut tx,
381 &NewHoldRequest {
382 hold_id: &hold_id,
383 envelope_id: &env.envelope_id,
384 agent_id: &action.actor_id,
385 trigger_target: &action.target,
386 trigger_action: action.action_type.as_str(),
387 pending_payload: &json!({
388 "target": &action.target,
389 "action_type": action.action_type.as_str(),
390 "payload": &action.payload,
391 "reserved_cost": reserved_cost
392 }),
393 },
394 )
395 .await?;
396 let log_index = hold_event.log_index as u64;
400 self.audit_log
401 .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
402 .await
403 .map_err(|e| KernelError::Audit(e.to_string()))?;
404 tx.commit().await?;
406
407 return Err(KernelError::HoldTriggered {
408 hold_id,
409 agent_id: action.actor_id.clone(),
410 });
411 }
412 }
413 }
414 }
415
416 let hold_response = if matches!(action.action_type, ActionType::Mutate) {
419 parse_hold_response(&action.target, &action.payload)
420 } else {
421 None
422 };
423
424 if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
426 return self
427 .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
428 .await;
429 }
430
431 let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
433 lifecycle::parse_lifecycle_op(&action.target, &action.payload)
434 } else {
435 None
436 };
437
438 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
440 let initiator = self
441 .actor_store
442 .get(&action.actor_id)
443 .await?
444 .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
445 let target_actor = self
446 .actor_store
447 .get(target_actor_id)
448 .await?
449 .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
450 lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
451 }
452
453 let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
454 let create_envelope_spec =
455 parse_create_envelope_spec(&action, &self.envelope_store).await?;
456 let policy_version = parse_policy_version(&action);
457
458 let (reserved_cost, reservation) = if let Some(hold_reserved) = action
464 .payload
465 .get("_hold_reserved_cost")
466 .and_then(|v| v.as_i64())
467 {
468 let phantom = if hold_reserved > 0 {
469 Some(EnergyReservation {
470 actor_id: action.actor_id.clone(),
471 reserved: hold_reserved,
472 })
473 } else {
474 None
475 };
476 (hold_reserved, phantom)
477 } else {
478 let cost = quote_cost(&action) as i64;
481 let res = if cost > 0 {
482 Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
483 } else {
484 None
485 };
486 (cost, res)
487 };
488
489 let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
493 Some(Self::validate_execute_payload(&action.payload)?)
494 } else {
495 None
496 };
497
498 let settled_cost = reserved_cost;
501
502 let mut event = EventRecord {
504 id: Uuid::new_v4().to_string(),
505 log_index: 0,
506 event_hash: String::new(),
507 actor_id: action.actor_id.clone(),
508 action_type: action.action_type.as_str().to_string(),
509 target: action.target.clone(),
510 payload: action.payload.clone(),
511 payload_hash: payload_hash_hex(&action)?,
512 artifact_hash: artifact_hash.clone(),
513 reserved_energy: reserved_cost,
514 settled_energy: settled_cost,
515 timestamp: now_millis_string(),
516 };
517 self.finalize_energy_and_event(
518 reservation.as_ref(),
519 settled_cost,
520 create_actor_spec.as_ref(),
521 create_envelope_spec.as_ref(),
522 &mut event,
523 )
524 .await?;
525
526 if let Some(ref version) = policy_version {
528 if let Err(err) = self.state_store.set_policy_version(version).await {
529 warn!(error = %err, "failed to record policy version after commit");
530 } else {
531 info!(policy_version = %version, event_id = %event.id, "policy version updated");
532 }
533 }
534
535 if action.action_type.is_state_changing()
537 && settled_cost > 0
538 && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
539 && actor.actor_type == ActorType::Agent
540 && let Ok(Some(envelope)) = self
541 .envelope_store
542 .get_active_for_actor(&action.actor_id)
543 .await
544 {
545 let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
547
548 let pool = self.state_store.pool();
550 let mut tx = pool.begin().await?;
551 match self
552 .envelope_store
553 .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
554 .await
555 {
556 Ok(_new_consumed) => {
557 match checkpoint {
559 Some(CheckpointLevel::Halt) => {
560 self.envelope_store
561 .set_status_in_tx(
562 &mut tx,
563 &envelope.envelope_id,
564 &consent::EnvelopeStatus::Expired,
565 )
566 .await?;
567 info!(
568 envelope_id = %envelope.envelope_id,
569 actor_id = %action.actor_id,
570 "checkpoint: HALT — envelope budget exhausted"
571 );
572 }
573 Some(CheckpointLevel::Report) => {
574 info!(
575 envelope_id = %envelope.envelope_id,
576 actor_id = %action.actor_id,
577 budget = envelope.budget,
578 consumed = envelope.budget_consumed + settled_cost,
579 "checkpoint: REPORT — summary logged"
580 );
581 }
582 None => {}
583 }
584 tx.commit().await?;
585 }
586 Err(err) => {
587 warn!(
588 error = %err,
589 "envelope budget consumption failed (event already committed)"
590 );
591 }
593 }
594 }
595
596 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
598 let pool = self.state_store.pool();
599 match op {
600 punkgo_core::actor::LifecycleOp::Freeze { .. } => {
601 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
602 {
603 Ok(frozen_ids) => {
604 info!(
605 target = %target_actor_id,
606 cascade_count = frozen_ids.len(),
607 "lifecycle: freeze executed"
608 );
609 }
610 Err(err) => {
611 warn!(error = %err, "lifecycle: freeze failed (event already committed)");
612 }
613 }
614 }
615 punkgo_core::actor::LifecycleOp::Unfreeze => {
616 match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
617 .await
618 {
619 Ok(()) => {
620 info!(target = %target_actor_id, "lifecycle: unfreeze executed");
621 }
622 Err(err) => {
623 warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
624 }
625 }
626 }
627 punkgo_core::actor::LifecycleOp::Terminate { .. } => {
628 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
631 {
632 Ok(frozen_ids) => {
633 info!(
634 target = %target_actor_id,
635 cascade_count = frozen_ids.len(),
636 "lifecycle: terminate executed (cascade frozen)"
637 );
638 }
639 Err(err) => {
640 warn!(error = %err, "lifecycle: terminate failed (event already committed)");
641 }
642 }
643 }
644 }
645 }
646
647 Ok(SubmitReceipt {
648 event_id: event.id,
649 log_index: event.log_index,
650 event_hash: event.event_hash,
651 reserved_cost,
652 settled_cost,
653 artifact_hash,
654 })
655 }
656
657 async fn finalize_energy_and_event(
658 &self,
659 reservation: Option<&EnergyReservation>,
660 settled_cost: i64,
661 create_actor: Option<&CreateActorSpec>,
662 create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
663 event: &mut EventRecord,
664 ) -> KernelResult<()> {
665 let pool = self.state_store.pool();
666 let mut tx = pool.begin().await?;
667
668 if let Some(res) = reservation {
670 self.energy_ledger
671 .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
672 .await?;
673 }
674
675 if let Some(spec) = create_actor {
677 self.energy_ledger
678 .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
679 .await?;
680 self.actor_store.create_in_tx(&mut tx, spec).await?;
681 info!(
682 created_actor = %spec.actor_id,
683 actor_type = spec.actor_type.as_str(),
684 energy_balance = spec.energy_balance,
685 "actor created in transaction"
686 );
687 }
688
689 if let Some((envelope_id, spec, parent_id)) = create_envelope {
691 self.envelope_store
692 .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
693 .await?;
694 info!(
695 envelope_id = %envelope_id,
696 actor_id = %spec.actor_id,
697 grantor_id = %spec.grantor_id,
698 budget = spec.budget,
699 "envelope created in transaction"
700 );
701 }
702
703 self.event_log.append_in_tx(&mut tx, event).await?;
705
706 let log_index = event.log_index as u64;
708 self.audit_log
709 .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
710 .await
711 .map_err(|e| KernelError::Audit(e.to_string()))?;
712
713 tx.commit().await?;
718 info!(event_id = %event.id, log_index = event.log_index, "event committed");
719 Ok(())
720 }
721
722 fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
727 Self::require_valid_oid(payload, "input_oid")?;
728 Self::require_valid_oid(payload, "output_oid")?;
729
730 if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
731 return Err(KernelError::ExecutePayloadInvalid(
732 "missing or invalid exit_code (must be integer)".to_string(),
733 ));
734 }
735
736 let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
737 Ok(artifact_hash)
738 }
739
740 fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
742 let val = payload
743 .get(field)
744 .and_then(|v| v.as_str())
745 .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
746
747 if !val.starts_with("sha256:") || val.len() != 71 {
748 return Err(KernelError::ExecutePayloadInvalid(format!(
749 "{field} must be sha256:<64 hex chars>, got: {val}"
750 )));
751 }
752 let hex_part = &val[7..];
753 if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
754 return Err(KernelError::ExecutePayloadInvalid(format!(
755 "{field} contains non-hex characters: {val}"
756 )));
757 }
758
759 Ok(val.to_string())
760 }
761
762 async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
763 let requester = query.requester_id.as_deref().unwrap_or("anonymous");
768 check_read_access(requester, &query.kind)?;
769
770 match query.kind.as_str() {
771 "health" => Ok(json!({ "status": "ok" })),
772 "actor_energy" => {
773 let actor_id = query.actor_id.ok_or_else(|| {
774 KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
775 })?;
776 let (energy_balance, reserved_energy) =
777 self.energy_ledger.balance_view(&actor_id).await?;
778 Ok(json!({
779 "actor_id": actor_id,
780 "energy_balance": energy_balance,
781 "reserved_energy": reserved_energy
782 }))
783 }
784 "events" => {
785 let limit = query.limit.unwrap_or(20).clamp(1, 500);
786 let events = self
787 .event_log
788 .query(
789 query.actor_id.as_deref(),
790 query.before_index,
791 query.after_index,
792 limit,
793 )
794 .await?;
795 let next_cursor = events.last().map(|e| e.log_index);
798 let has_more = events.len() as i64 == limit;
799 Ok(json!({
800 "events": events,
801 "has_more": has_more,
802 "next_cursor": next_cursor
803 }))
804 }
805 "stats" => {
806 let event_count = self.event_log.count().await?;
807 Ok(json!({ "event_count": event_count }))
808 }
809 "snapshot" => {
810 let event_count = self.event_log.count().await?;
813 self.audit_log
814 .ensure_checkpoint(event_count as u64)
815 .await
816 .map_err(|e| KernelError::Audit(e.to_string()))?;
817 let cp = self
818 .audit_log
819 .latest_checkpoint()
820 .await
821 .map_err(|e| KernelError::Audit(e.to_string()))?;
822 Ok(json!({
823 "event_count": event_count,
824 "snapshot_hash": cp.root_hash,
825 "generated_at": cp.created_at
826 }))
827 }
828 "paths" => {
829 let paths = self.state_store.paths();
830 Ok(json!({
831 "root": paths.root.display().to_string(),
832 "workspace_root": paths.workspace_root.display().to_string(),
833 "quarantine_root": paths.quarantine_root.display().to_string(),
834 "db_path": paths.db_path.display().to_string()
835 }))
836 }
837 "audit_checkpoint" => {
838 let event_count = self.event_log.count().await?;
839 self.audit_log
840 .ensure_checkpoint(event_count as u64)
841 .await
842 .map_err(|e| KernelError::Audit(e.to_string()))?;
843 let cp = self
844 .audit_log
845 .latest_checkpoint()
846 .await
847 .map_err(|e| KernelError::Audit(e.to_string()))?;
848 Ok(serde_json::to_value(cp)?)
849 }
850 "audit_inclusion_proof" => {
851 let log_index = query.log_index.ok_or_else(|| {
852 KernelError::InvalidRequest(
853 "log_index is required for audit_inclusion_proof".to_string(),
854 )
855 })? as u64;
856 let tree_size = match query.tree_size {
857 Some(s) => s as u64,
858 None => {
859 let event_count = self.event_log.count().await? as u64;
861 self.audit_log
862 .ensure_checkpoint(event_count)
863 .await
864 .map_err(|e| KernelError::Audit(e.to_string()))?;
865 self.audit_log
866 .tree_size()
867 .await
868 .map_err(|e| KernelError::Audit(e.to_string()))?
869 }
870 };
871 let proof = self
872 .audit_log
873 .inclusion_proof(log_index, tree_size)
874 .await
875 .map_err(|e| KernelError::Audit(e.to_string()))?;
876 Ok(json!({
877 "log_index": log_index,
878 "tree_size": tree_size,
879 "proof": proof
880 }))
881 }
882 "audit_consistency_proof" => {
883 let old_size = query.old_size.ok_or_else(|| {
884 KernelError::InvalidRequest(
885 "old_size is required for audit_consistency_proof".to_string(),
886 )
887 })? as u64;
888 let new_size = query.tree_size.ok_or_else(|| {
889 KernelError::InvalidRequest(
890 "tree_size is required for audit_consistency_proof".to_string(),
891 )
892 })? as u64;
893 let proof = self
894 .audit_log
895 .consistency_proof(old_size, new_size)
896 .await
897 .map_err(|e| KernelError::Audit(e.to_string()))?;
898 Ok(json!({
899 "old_size": old_size,
900 "new_size": new_size,
901 "proof": proof
902 }))
903 }
904 "stellar_info" => {
906 let config = &self.stellar_config;
907 Ok(json!({
908 "gpu_model": config.gpu_model,
909 "cpu_model": config.cpu_model,
910 "int8_tops": config.int8_tops,
911 "energy_per_tick": config.effective_energy_per_tick(),
912 "tick_interval_ms": config.tick_interval_ms,
913 "luminosity_source": config.luminosity_source
914 }))
915 }
916 "envelope_info" => {
918 let actor_id = query.actor_id.ok_or_else(|| {
919 KernelError::InvalidRequest(
920 "actor_id is required for envelope_info".to_string(),
921 )
922 })?;
923 let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
924 match envelope {
925 Some(record) => Ok(serde_json::to_value(record)?),
926 None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
927 }
928 }
929 "actor_info" => {
931 let actor_id = query.actor_id.ok_or_else(|| {
932 KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
933 })?;
934 let actor = self.actor_store.get(&actor_id).await?;
935 match actor {
936 Some(record) => Ok(serde_json::to_value(record)?),
937 None => Err(KernelError::ActorNotFound(actor_id)),
938 }
939 }
940 "hold_info" => {
942 let hold_id = query.hold_id.ok_or_else(|| {
943 KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
944 })?;
945 let hold = self.envelope_store.get_hold_request(&hold_id).await?;
946 match hold {
947 Some(record) => Ok(record),
948 None => Err(KernelError::InvalidRequest(format!(
949 "hold_request not found: {hold_id}"
950 ))),
951 }
952 }
953 "holds_pending" => {
955 let holds = self
956 .envelope_store
957 .list_pending_holds(query.actor_id.as_deref())
958 .await?;
959 Ok(json!({ "holds": holds }))
960 }
961 other => Err(KernelError::InvalidRequest(format!(
962 "unsupported read query kind: {other}"
963 ))),
964 }
965 }
966}
967
968fn now_millis_string() -> String {
969 let now = std::time::SystemTime::now()
970 .duration_since(std::time::UNIX_EPOCH)
971 .unwrap_or_default();
972 now.as_millis().to_string()
973}
974
975fn now_millis_u64() -> u64 {
976 std::time::SystemTime::now()
977 .duration_since(std::time::UNIX_EPOCH)
978 .unwrap_or_default()
979 .as_millis() as u64
980}
981
982fn parse_policy_version(action: &Action) -> Option<String> {
985 if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
986 return None;
987 }
988 action
989 .payload
990 .get("version")
991 .and_then(|v| v.as_str())
992 .map(|s| s.to_string())
993}
994
995async fn parse_create_actor_spec(
1002 action: &Action,
1003 actor_store: &ActorStore,
1004) -> KernelResult<Option<CreateActorSpec>> {
1005 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
1006 return Ok(None);
1007 }
1008
1009 let payload = action.payload.as_object().ok_or_else(|| {
1010 KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1011 })?;
1012
1013 let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1015 let purpose = payload
1016 .get("purpose")
1017 .and_then(Value::as_str)
1018 .unwrap_or("legacy");
1019 let energy_balance = payload
1020 .get("energy_balance")
1021 .and_then(Value::as_i64)
1022 .unwrap_or(1000);
1023
1024 let actor_type_str = payload
1026 .get("actor_type")
1027 .and_then(Value::as_str)
1028 .unwrap_or("agent");
1029 let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1030 KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1031 })?;
1032
1033 let actor_id = if let Some(id) = explicit_id {
1035 id.to_string()
1036 } else {
1037 if purpose == "legacy" {
1038 return Err(KernelError::InvalidRequest(
1039 "actor_id or purpose is required to create an actor".to_string(),
1040 ));
1041 }
1042 let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1043 derive_agent_id(&action.actor_id, purpose, seq)
1044 };
1045
1046 let creator_record = actor_store.get(&action.actor_id).await?;
1048 let (creator_type, creator_lineage) = match &creator_record {
1049 Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1050 None => (ActorType::Human, vec![]),
1052 };
1053
1054 if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1056 return Err(KernelError::PolicyViolation(
1057 "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1058 .to_string(),
1059 ));
1060 }
1061
1062 let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1063
1064 let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1066 payload.get("writable_targets")
1067 {
1068 serde_json::from_value(targets_val.clone())
1069 .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1070 } else {
1071 vec![]
1072 };
1073
1074 if !writable_targets.is_empty()
1076 && let Some(ref creator) = creator_record
1077 {
1078 validate_child_targets(creator, &writable_targets)?;
1079 }
1080
1081 let energy_share = payload
1083 .get("energy_share")
1084 .and_then(Value::as_f64)
1085 .unwrap_or(0.0);
1086
1087 let reduction_policy = payload
1088 .get("reduction_policy")
1089 .and_then(Value::as_str)
1090 .unwrap_or("none")
1091 .to_string();
1092
1093 Ok(Some(CreateActorSpec {
1094 actor_id,
1095 actor_type,
1096 creator_id: action.actor_id.clone(),
1097 lineage,
1098 purpose: Some(purpose.to_string()),
1099 writable_targets,
1100 energy_balance,
1101 energy_share,
1102 reduction_policy,
1103 }))
1104}
1105
1106async fn parse_create_envelope_spec(
1115 action: &Action,
1116 envelope_store: &EnvelopeStore,
1117) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1118 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1119 return Ok(None);
1120 }
1121
1122 let payload = action.payload.as_object().ok_or_else(|| {
1123 KernelError::InvalidRequest("envelope payload must be an object".to_string())
1124 })?;
1125
1126 let actor_id = payload
1127 .get("actor_id")
1128 .and_then(Value::as_str)
1129 .ok_or_else(|| {
1130 KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1131 })?
1132 .to_string();
1133
1134 let budget = payload
1135 .get("budget")
1136 .and_then(Value::as_i64)
1137 .ok_or_else(|| {
1138 KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1139 })?;
1140
1141 if budget <= 0 {
1142 return Err(KernelError::InvalidRequest(
1143 "envelope budget must be positive".to_string(),
1144 ));
1145 }
1146
1147 let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1148 serde_json::from_value(targets_val.clone())
1149 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1150 } else {
1151 return Err(KernelError::InvalidRequest(
1152 "targets are required to create an envelope".to_string(),
1153 ));
1154 };
1155
1156 let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1157 serde_json::from_value(actions_val.clone())
1158 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1159 } else {
1160 return Err(KernelError::InvalidRequest(
1161 "actions are required to create an envelope".to_string(),
1162 ));
1163 };
1164
1165 let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1166 let report_every = payload.get("report_every").and_then(Value::as_i64);
1167 let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1168
1169 let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1171 serde_json::from_value(hold_on_val.clone())
1172 .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1173 } else {
1174 vec![]
1175 };
1176
1177 let spec = EnvelopeSpec {
1178 actor_id,
1179 grantor_id: action.actor_id.clone(),
1180 budget,
1181 targets,
1182 actions,
1183 duration_secs,
1184 report_every,
1185 hold_on,
1186 hold_timeout_secs,
1187 };
1188
1189 let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1191 .get_active_for_actor(&action.actor_id)
1192 .await?
1193 {
1194 consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1195 Some(grantor_envelope.envelope_id)
1196 } else {
1197 None
1198 };
1199
1200 let envelope_id = Uuid::new_v4().to_string();
1201
1202 Ok(Some((envelope_id, spec, parent_envelope_id)))
1203}
1204
1205fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1213 let rest = target.strip_prefix("ledger/hold/")?;
1215 if rest.is_empty() || rest.contains('/') {
1217 return None;
1218 }
1219 let hold_id = rest.to_string();
1220
1221 let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1222
1223 if decision != "approve" && decision != "reject" {
1225 return None;
1226 }
1227
1228 let instruction = payload
1229 .get("instruction")
1230 .and_then(Value::as_str)
1231 .map(|s| s.to_string());
1232
1233 Some((hold_id, decision, instruction))
1234}
1235
1236impl Kernel {
1237 async fn execute_hold_response(
1247 &self,
1248 human_action: &Action,
1249 hold_id: &str,
1250 decision: &str,
1251 instruction: Option<&str>,
1252 ) -> KernelResult<SubmitReceipt> {
1253 let caller = self
1255 .actor_store
1256 .get(&human_action.actor_id)
1257 .await?
1258 .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1259
1260 if caller.actor_type != punkgo_core::actor::ActorType::Human {
1261 return Err(KernelError::PolicyViolation(format!(
1262 "only Human actors can resolve holds; caller {} is {:?}",
1263 human_action.actor_id, caller.actor_type
1264 )));
1265 }
1266
1267 let hold_record = self
1269 .envelope_store
1270 .get_hold_request(hold_id)
1271 .await?
1272 .ok_or_else(|| {
1273 KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1274 })?;
1275
1276 let _envelope_id = hold_record
1277 .get("envelope_id")
1278 .and_then(Value::as_str)
1279 .ok_or_else(|| {
1280 KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1281 })?
1282 .to_string();
1283
1284 let agent_id = hold_record
1285 .get("agent_id")
1286 .and_then(Value::as_str)
1287 .ok_or_else(|| {
1288 KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1289 })?
1290 .to_string();
1291
1292 let pending_payload = hold_record
1293 .get("pending_payload")
1294 .cloned()
1295 .unwrap_or(Value::Null);
1296
1297 let trigger_target = hold_record
1298 .get("trigger_target")
1299 .and_then(Value::as_str)
1300 .unwrap_or("")
1301 .to_string();
1302
1303 let trigger_action = hold_record
1304 .get("trigger_action")
1305 .and_then(Value::as_str)
1306 .unwrap_or("")
1307 .to_string();
1308
1309 let status = hold_record
1310 .get("status")
1311 .and_then(Value::as_str)
1312 .unwrap_or("");
1313 if status != "pending" {
1314 return Err(KernelError::PolicyViolation(format!(
1315 "hold_request {hold_id} is already resolved (status={status})"
1316 )));
1317 }
1318
1319 let envelope_id_str = hold_record
1322 .get("envelope_id")
1323 .and_then(Value::as_str)
1324 .unwrap_or("")
1325 .to_string();
1326 if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1327 if let Some(timeout_secs) = env.hold_timeout_secs {
1328 if timeout_secs > 0 {
1329 let triggered_at: u64 = hold_record
1330 .get("triggered_at")
1331 .and_then(|v| v.as_str())
1332 .and_then(|s| s.parse().ok())
1333 .unwrap_or(0);
1334 let now = now_millis_u64();
1335 if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1336 self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1338 .await?;
1339 return Err(KernelError::PolicyViolation(format!(
1340 "hold_request {hold_id} has timed out and was auto-rejected"
1341 )));
1342 }
1343 }
1344 }
1345 }
1346
1347 let response_payload = json!({
1349 "hold_id": hold_id,
1350 "agent_id": &agent_id,
1351 "decision": decision,
1352 "instruction": instruction,
1353 "trigger": {
1354 "target": &trigger_target,
1355 "action_type": &trigger_action
1356 },
1357 "resolved_by": &human_action.actor_id,
1358 "resolved_at": now_millis_string()
1359 });
1360
1361 let response_action = Action {
1362 actor_id: human_action.actor_id.clone(),
1363 action_type: ActionType::Mutate,
1364 target: format!("ledger/hold/{hold_id}"),
1365 payload: response_payload.clone(),
1366 timestamp: None,
1367 };
1368
1369 let pool = self.state_store.pool();
1372 let mut tx = pool.begin().await?;
1373
1374 self.envelope_store
1375 .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1376 .await?;
1377
1378 let hold_reserved_cost = pending_payload
1381 .get("reserved_cost")
1382 .and_then(|v| v.as_i64())
1383 .unwrap_or(0);
1384
1385 let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1386 let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1387 self.energy_ledger
1388 .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1389 .await?;
1390 cost
1391 } else {
1392 0
1393 };
1394
1395 let mut response_event = EventRecord {
1396 id: Uuid::new_v4().to_string(),
1397 log_index: 0,
1398 event_hash: String::new(),
1399 actor_id: human_action.actor_id.clone(),
1400 action_type: "hold_response".to_string(),
1401 target: format!("ledger/hold/{hold_id}"),
1402 payload: response_payload.clone(),
1403 payload_hash: payload_hash_hex(&response_action)?,
1404 artifact_hash: None,
1405 reserved_energy: hold_reserved_cost,
1406 settled_energy: commitment_cost,
1407 timestamp: now_millis_string(),
1408 };
1409 self.event_log
1410 .append_in_tx(&mut tx, &mut response_event)
1411 .await?;
1412
1413 let log_index = response_event.log_index as u64;
1415 self.audit_log
1416 .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1417 .await
1418 .map_err(|e| KernelError::Audit(e.to_string()))?;
1419 tx.commit().await?;
1421
1422 info!(
1423 hold_id = %hold_id,
1424 decision = %decision,
1425 agent_id = %agent_id,
1426 resolved_by = %human_action.actor_id,
1427 "PIP-001 §11d: hold resolved"
1428 );
1429
1430 if decision == "approve" {
1432 let orig_target = pending_payload
1434 .get("target")
1435 .and_then(Value::as_str)
1436 .unwrap_or(&trigger_target)
1437 .to_string();
1438 let orig_action_type_str = pending_payload
1439 .get("action_type")
1440 .and_then(Value::as_str)
1441 .unwrap_or(&trigger_action)
1442 .to_string();
1443 let mut orig_payload = pending_payload
1444 .get("payload")
1445 .cloned()
1446 .unwrap_or(Value::Null);
1447
1448 if let Some(obj) = orig_payload.as_object_mut() {
1455 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1456 obj.insert(
1457 "_hold_reserved_cost".to_string(),
1458 Value::Number(hold_reserved_cost.into()),
1459 );
1460 if let Some(instr) = instruction {
1462 obj.insert(
1463 "_hold_instruction".to_string(),
1464 Value::String(instr.to_string()),
1465 );
1466 }
1467 } else {
1468 let mut obj = serde_json::Map::new();
1470 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1471 obj.insert(
1472 "_hold_reserved_cost".to_string(),
1473 Value::Number(hold_reserved_cost.into()),
1474 );
1475 if let Some(instr) = instruction {
1476 obj.insert(
1477 "_hold_instruction".to_string(),
1478 Value::String(instr.to_string()),
1479 );
1480 }
1481 orig_payload = Value::Object(obj);
1482 }
1483
1484 let orig_action_type = match orig_action_type_str.as_str() {
1485 "observe" => ActionType::Observe,
1486 "create" => ActionType::Create,
1487 "mutate" => ActionType::Mutate,
1488 _ => ActionType::Execute,
1489 };
1490
1491 let pending_action = Action {
1492 actor_id: agent_id.clone(),
1493 action_type: orig_action_type,
1494 target: orig_target,
1495 payload: orig_payload,
1496 timestamp: None,
1497 };
1498
1499 info!(
1500 hold_id = %hold_id,
1501 agent_id = %agent_id,
1502 "PIP-001 §11d: re-executing approved pending action"
1503 );
1504
1505 return Box::pin(self.submit_action(pending_action)).await;
1509 }
1510
1511 Ok(SubmitReceipt {
1513 event_id: response_event.id,
1514 log_index: response_event.log_index,
1515 event_hash: response_event.event_hash,
1516 reserved_cost: hold_reserved_cost,
1517 settled_cost: commitment_cost,
1518 artifact_hash: None,
1519 })
1520 }
1521
1522 async fn expire_timed_out_holds(
1529 &self,
1530 envelope_id: &str,
1531 hold_timeout_secs: i64,
1532 ) -> KernelResult<()> {
1533 let holds = self
1534 .envelope_store
1535 .list_pending_holds_for_envelope(envelope_id)
1536 .await?;
1537 let now = now_millis_u64();
1538
1539 for hold in holds {
1540 let triggered_at: u64 = hold
1541 .get("triggered_at")
1542 .and_then(|v| v.as_str())
1543 .and_then(|s| s.parse().ok())
1544 .unwrap_or(0);
1545
1546 if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1547 continue;
1548 }
1549
1550 let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1552 let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1553 let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1554 let reserved_cost = pending_payload
1555 .get("reserved_cost")
1556 .and_then(|v| v.as_i64())
1557 .unwrap_or(0);
1558
1559 let commitment_cost = if reserved_cost > 0 {
1560 ((reserved_cost as f64) * 0.2).ceil() as i64
1561 } else {
1562 0
1563 };
1564
1565 let timeout_payload = json!({
1566 "hold_id": hold_id,
1567 "agent_id": agent_id,
1568 "decision": "timed_out",
1569 "reserved_cost": reserved_cost,
1570 "commitment_cost": commitment_cost,
1571 "triggered_at": triggered_at.to_string(),
1572 "expired_at": now.to_string()
1573 });
1574 let timeout_action = Action {
1575 actor_id: agent_id.to_string(),
1576 action_type: ActionType::Mutate,
1577 target: format!("ledger/hold/{hold_id}"),
1578 payload: timeout_payload.clone(),
1579 timestamp: None,
1580 };
1581 let mut timeout_event = EventRecord {
1582 id: Uuid::new_v4().to_string(),
1583 log_index: 0,
1584 event_hash: String::new(),
1585 actor_id: agent_id.to_string(),
1586 action_type: "hold_timeout".to_string(),
1587 target: format!("ledger/hold/{hold_id}"),
1588 payload: timeout_payload,
1589 payload_hash: payload_hash_hex(&timeout_action)?,
1590 artifact_hash: None,
1591 reserved_energy: reserved_cost,
1592 settled_energy: commitment_cost,
1593 timestamp: now_millis_string(),
1594 };
1595
1596 let pool = self.state_store.pool();
1597 let mut tx = pool.begin().await?;
1598 self.envelope_store
1599 .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1600 .await?;
1601 if reserved_cost > 0 {
1602 self.energy_ledger
1603 .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1604 .await?;
1605 }
1606 self.event_log
1607 .append_in_tx(&mut tx, &mut timeout_event)
1608 .await?;
1609
1610 let t_log_index = timeout_event.log_index as u64;
1612 self.audit_log
1613 .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1614 .await
1615 .map_err(|e| KernelError::Audit(e.to_string()))?;
1616 tx.commit().await?;
1618
1619 info!(
1620 hold_id = %hold_id,
1621 agent_id = %agent_id,
1622 commitment_cost = commitment_cost,
1623 "PIP-001 §11e: hold auto-expired (timeout)"
1624 );
1625 }
1626 Ok(())
1627 }
1628}