1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{info, warn};
6use uuid::Uuid;
7
8use crate::audit::AuditLog;
9use punkgo_core::action::{Action, ActionType, payload_hash_hex, quote_cost};
10use punkgo_core::actor::{
11 ActorType, CreateActorSpec, WritableTarget, build_lineage, derive_agent_id,
12};
13use punkgo_core::boundary::{check_writable_boundary, validate_child_targets};
14use punkgo_core::consent::{self, AuthorizationMode, CheckpointLevel, EnvelopeSpec, HoldRule};
15use punkgo_core::errors::{KernelError, KernelResult};
16use punkgo_core::policy::{check_read_access, validate_action};
17
18use super::lifecycle;
19use crate::state::{
20 ActorStore, EnergyLedger, EnergyReservation, EnvelopeStore, EventLog, EventRecord,
21 NewHoldRequest, StateStore,
22};
23use punkgo_core::protocol::{RequestEnvelope, RequestType, ResponseEnvelope};
24use punkgo_core::stellar::{StellarConfig, load_stellar_config};
25
26#[derive(Debug, Clone)]
28pub struct KernelConfig {
29 pub state_dir: PathBuf,
30 pub ipc_endpoint: String,
31}
32
33impl Default for KernelConfig {
34 fn default() -> Self {
35 let state_dir = std::env::var("PUNKGO_STATE_DIR")
36 .map(PathBuf::from)
37 .unwrap_or_else(|_| default_state_dir());
38 let ipc_endpoint =
39 std::env::var("PUNKGO_IPC_ENDPOINT").unwrap_or_else(|_| default_ipc_endpoint());
40 Self {
41 state_dir,
42 ipc_endpoint,
43 }
44 }
45}
46
47fn default_ipc_endpoint() -> String {
58 let pid = std::process::id();
59 if cfg!(windows) {
60 format!(r"\\.\pipe\punkgo-kernel-{pid}")
61 } else {
62 let state_dir = std::env::var("PUNKGO_STATE_DIR")
63 .map(std::path::PathBuf::from)
64 .unwrap_or_else(|_| default_state_dir());
65 state_dir
66 .join(format!("daemon-{pid}.sock"))
67 .to_string_lossy()
68 .into_owned()
69 }
70}
71
72fn default_state_dir() -> PathBuf {
73 if let Some(home) = home_dir() {
74 return home.join(".punkgo").join("state");
75 }
76 PathBuf::from("state")
77}
78
79fn home_dir() -> Option<PathBuf> {
80 if let Some(home) = std::env::var_os("HOME") {
82 return Some(PathBuf::from(home));
83 }
84 if let Some(profile) = std::env::var_os("USERPROFILE") {
86 return Some(PathBuf::from(profile));
87 }
88 let drive = std::env::var_os("HOMEDRIVE")?;
90 let path = std::env::var_os("HOMEPATH")?;
91 let mut p = PathBuf::from(drive);
92 p.push(path);
93 Some(p)
94}
95
96#[derive(Debug, Clone, Serialize)]
102pub struct SubmitReceipt {
103 pub event_id: String,
104 pub log_index: i64,
105 pub event_hash: String,
106 pub reserved_cost: i64,
107 pub settled_cost: i64,
108 pub artifact_hash: Option<String>,
109}
110
111#[derive(Debug, Deserialize)]
112struct ReadQuery {
113 kind: String,
114 actor_id: Option<String>,
115 #[serde(default)]
117 hold_id: Option<String>,
118 limit: Option<i64>,
119 log_index: Option<i64>,
121 tree_size: Option<i64>,
123 old_size: Option<i64>,
125 #[serde(default)]
128 before_index: Option<i64>,
129 #[serde(default)]
132 after_index: Option<i64>,
133 #[serde(default)]
137 requester_id: Option<String>,
138}
139
140pub struct Kernel {
148 state_store: StateStore,
149 energy_ledger: EnergyLedger,
150 event_log: EventLog,
151 audit_log: AuditLog,
152 actor_store: ActorStore,
153 envelope_store: EnvelopeStore,
154 stellar_config: StellarConfig,
155}
156
157impl Kernel {
158 pub async fn bootstrap(config: &KernelConfig) -> KernelResult<Self> {
162 let state_store = StateStore::bootstrap(&config.state_dir).await?;
163 let energy_ledger = EnergyLedger::new(state_store.pool());
164 let event_log = EventLog::new(state_store.pool());
165 let audit_log = AuditLog::new(state_store.pool(), "punkgo/kernel");
166 let actor_store = ActorStore::new(state_store.pool());
167 let envelope_store = EnvelopeStore::new(state_store.pool());
168
169 let stellar_config_path = config.state_dir.join("stellar.toml");
171 let stellar_config = load_stellar_config(&stellar_config_path)?;
172 info!(
173 energy_per_tick = stellar_config.effective_energy_per_tick(),
174 int8_tops = stellar_config.int8_tops,
175 tick_interval_ms = stellar_config.tick_interval_ms,
176 "stellar configuration loaded"
177 );
178
179 Ok(Self {
180 state_store,
181 energy_ledger,
182 event_log,
183 audit_log,
184 actor_store,
185 envelope_store,
186 stellar_config,
187 })
188 }
189
190 pub fn stellar_config(&self) -> &StellarConfig {
193 &self.stellar_config
194 }
195
196 pub fn energy_ledger(&self) -> &EnergyLedger {
198 &self.energy_ledger
199 }
200
201 pub fn actor_store(&self) -> &ActorStore {
203 &self.actor_store
204 }
205
206 pub fn envelope_store(&self) -> &EnvelopeStore {
208 &self.envelope_store
209 }
210
211 pub fn pool(&self) -> sqlx::SqlitePool {
213 self.state_store.pool()
214 }
215
216 pub async fn handle_request(&self, req: RequestEnvelope) -> ResponseEnvelope {
218 let request_id = req.request_id.clone();
219 info!(
220 request_id = %request_id,
221 request_type = ?req.request_type,
222 "received request"
223 );
224 match self.dispatch(req).await {
225 Ok(payload) => {
226 info!(request_id = %request_id, "request completed");
227 ResponseEnvelope::ok(request_id, payload)
228 }
229 Err(err) => {
230 warn!(error = %err, "request failed");
231 ResponseEnvelope::err_structured(request_id, &err)
232 }
233 }
234 }
235
236 async fn dispatch(&self, req: RequestEnvelope) -> KernelResult<Value> {
237 match req.request_type {
238 RequestType::Quote => {
239 let action: Action = serde_json::from_value(req.payload)?;
240 validate_action(&action)?;
241 let cost = quote_cost(&action);
242 Ok(json!({ "cost": cost }))
243 }
244 RequestType::Submit => {
245 let action: Action = serde_json::from_value(req.payload)?;
246 let receipt = self.submit_action(action).await?;
247 Ok(serde_json::to_value(receipt)?)
248 }
249 RequestType::Read => {
250 let query: ReadQuery = serde_json::from_value(req.payload)?;
251 self.read_query(query).await
252 }
253 }
254 }
255
256 async fn submit_action(&self, action: Action) -> KernelResult<SubmitReceipt> {
257 validate_action(&action)?;
259 if !self.state_store.actor_exists(&action.actor_id).await? {
260 return Err(KernelError::ActorNotFound(action.actor_id.clone()));
261 }
262
263 if let Some(actor) = self.actor_store.get(&action.actor_id).await? {
267 if actor.status == punkgo_core::actor::ActorStatus::Frozen
268 && action.action_type.is_state_changing()
269 {
270 return Err(KernelError::ActorFrozen(format!(
271 "actor {} is frozen and cannot perform state-changing actions",
272 action.actor_id
273 )));
274 }
275 check_writable_boundary(&actor, &action.target, &action.action_type)?;
277
278 if actor.actor_type == punkgo_core::actor::ActorType::Agent
281 && action.action_type.is_state_changing()
282 {
283 lifecycle::check_lineage_active(&self.actor_store, &actor.lineage).await?;
284 }
285
286 if action.action_type.is_state_changing() {
289 let envelope = self
290 .envelope_store
291 .get_active_for_actor(&action.actor_id)
292 .await?;
293
294 let envelope = if let Some(env) = envelope {
296 if consent::is_envelope_expired(&env, now_millis_u64()) {
297 self.envelope_store
298 .set_status(&env.envelope_id, &consent::EnvelopeStatus::Expired)
299 .await?;
300 None
301 } else {
302 Some(env)
303 }
304 } else {
305 None
306 };
307
308 let auth_mode = consent::check_authorization(&actor, envelope.as_ref())?;
309
310 if auth_mode == AuthorizationMode::ManOnTheLoop
312 && let Some(ref env) = envelope
313 {
314 consent::check_envelope_covers(
315 env,
316 &action.target,
317 action.action_type.as_str(),
318 )?;
319
320 if !env.hold_on.is_empty() {
322 if let Some(timeout_secs) = env.hold_timeout_secs {
323 if timeout_secs > 0 {
324 self.expire_timed_out_holds(&env.envelope_id, timeout_secs)
325 .await?;
326 }
327 }
328 }
329
330 let hold_approved = action
335 .payload
336 .get("_hold_approved")
337 .and_then(|v| v.as_bool())
338 .unwrap_or(false);
339 if !hold_approved
340 && consent::check_hold_trigger(
341 &env.hold_on,
342 &action.target,
343 action.action_type.as_str(),
344 )
345 {
346 let hold_id = Uuid::new_v4().to_string();
347
348 let reserved_cost = quote_cost(&action) as i64;
351
352 let hold_payload = json!({
353 "hold_id": &hold_id,
354 "agent_id": &action.actor_id,
355 "trigger": {
356 "target": &action.target,
357 "action_type": action.action_type.as_str()
358 },
359 "pending_action": {
360 "target": &action.target,
361 "action_type": action.action_type.as_str(),
362 "payload": &action.payload
363 },
364 "reserved_cost": reserved_cost,
365 "triggered_at": now_millis_string()
366 });
367
368 let hold_action = Action {
370 actor_id: action.actor_id.clone(),
371 action_type: ActionType::Mutate,
372 target: format!("ledger/hold/{hold_id}"),
373 payload: hold_payload.clone(),
374 timestamp: None,
375 };
376 let mut hold_event = EventRecord {
377 id: Uuid::new_v4().to_string(),
378 log_index: 0,
379 event_hash: String::new(),
380 actor_id: action.actor_id.clone(),
381 action_type: "hold_request".to_string(),
382 target: format!("ledger/hold/{hold_id}"),
383 payload: hold_payload.clone(),
384 payload_hash: payload_hash_hex(&hold_action)?,
385 artifact_hash: None,
386 reserved_energy: reserved_cost,
387 settled_energy: 0,
388 timestamp: now_millis_string(),
389 };
390
391 let pool = self.state_store.pool();
393 let mut tx = pool.begin().await?;
394 if reserved_cost > 0 {
395 self.energy_ledger
396 .reserve_in_tx(&mut tx, &action.actor_id, reserved_cost)
397 .await?;
398 }
399 self.event_log
400 .append_in_tx(&mut tx, &mut hold_event)
401 .await?;
402 self.envelope_store
403 .create_hold_request_in_tx(
404 &mut tx,
405 &NewHoldRequest {
406 hold_id: &hold_id,
407 envelope_id: &env.envelope_id,
408 agent_id: &action.actor_id,
409 trigger_target: &action.target,
410 trigger_action: action.action_type.as_str(),
411 pending_payload: &json!({
412 "target": &action.target,
413 "action_type": action.action_type.as_str(),
414 "payload": &action.payload,
415 "reserved_cost": reserved_cost
416 }),
417 },
418 )
419 .await?;
420 let log_index = hold_event.log_index as u64;
424 self.audit_log
425 .append_leaf_in_tx(&mut tx, log_index, &hold_event.event_hash)
426 .await
427 .map_err(|e| KernelError::Audit(e.to_string()))?;
428 tx.commit().await?;
430
431 return Err(KernelError::HoldTriggered {
432 hold_id,
433 agent_id: action.actor_id.clone(),
434 });
435 }
436 }
437 }
438 }
439
440 let hold_response = if matches!(action.action_type, ActionType::Mutate) {
443 parse_hold_response(&action.target, &action.payload)
444 } else {
445 None
446 };
447
448 if let Some((ref hold_id, ref decision, ref instruction)) = hold_response {
450 return self
451 .execute_hold_response(&action, hold_id, decision, instruction.as_deref())
452 .await;
453 }
454
455 let lifecycle_op = if matches!(action.action_type, ActionType::Mutate) {
457 lifecycle::parse_lifecycle_op(&action.target, &action.payload)
458 } else {
459 None
460 };
461
462 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
464 let initiator = self
465 .actor_store
466 .get(&action.actor_id)
467 .await?
468 .ok_or_else(|| KernelError::ActorNotFound(action.actor_id.clone()))?;
469 let target_actor = self
470 .actor_store
471 .get(target_actor_id)
472 .await?
473 .ok_or_else(|| KernelError::ActorNotFound(target_actor_id.clone()))?;
474 lifecycle::validate_lifecycle_authorization(&initiator, &target_actor, op).await?;
475 }
476
477 let create_actor_spec = parse_create_actor_spec(&action, &self.actor_store).await?;
478 let create_envelope_spec =
479 parse_create_envelope_spec(&action, &self.envelope_store).await?;
480 let policy_version = parse_policy_version(&action);
481
482 let (reserved_cost, reservation) = if let Some(hold_reserved) = action
488 .payload
489 .get("_hold_reserved_cost")
490 .and_then(|v| v.as_i64())
491 {
492 let phantom = if hold_reserved > 0 {
493 Some(EnergyReservation {
494 actor_id: action.actor_id.clone(),
495 reserved: hold_reserved,
496 })
497 } else {
498 None
499 };
500 (hold_reserved, phantom)
501 } else {
502 let cost = quote_cost(&action) as i64;
505 let res = if cost > 0 {
506 Some(self.energy_ledger.reserve(&action.actor_id, cost).await?)
507 } else {
508 None
509 };
510 (cost, res)
511 };
512
513 let artifact_hash = if matches!(action.action_type, ActionType::Execute) {
517 Some(Self::validate_execute_payload(&action.payload)?)
518 } else {
519 None
520 };
521
522 let settled_cost = reserved_cost;
525
526 let mut event = EventRecord {
528 id: Uuid::new_v4().to_string(),
529 log_index: 0,
530 event_hash: String::new(),
531 actor_id: action.actor_id.clone(),
532 action_type: action.action_type.as_str().to_string(),
533 target: action.target.clone(),
534 payload: action.payload.clone(),
535 payload_hash: payload_hash_hex(&action)?,
536 artifact_hash: artifact_hash.clone(),
537 reserved_energy: reserved_cost,
538 settled_energy: settled_cost,
539 timestamp: now_millis_string(),
540 };
541 self.finalize_energy_and_event(
542 reservation.as_ref(),
543 settled_cost,
544 create_actor_spec.as_ref(),
545 create_envelope_spec.as_ref(),
546 &mut event,
547 )
548 .await?;
549
550 if let Some(ref version) = policy_version {
552 if let Err(err) = self.state_store.set_policy_version(version).await {
553 warn!(error = %err, "failed to record policy version after commit");
554 } else {
555 info!(policy_version = %version, event_id = %event.id, "policy version updated");
556 }
557 }
558
559 if action.action_type.is_state_changing()
561 && settled_cost > 0
562 && let Ok(Some(actor)) = self.actor_store.get(&action.actor_id).await
563 && actor.actor_type == ActorType::Agent
564 && let Ok(Some(envelope)) = self
565 .envelope_store
566 .get_active_for_actor(&action.actor_id)
567 .await
568 {
569 let checkpoint = consent::check_checkpoint(&envelope, settled_cost);
571
572 let pool = self.state_store.pool();
574 let mut tx = pool.begin().await?;
575 match self
576 .envelope_store
577 .consume_budget_in_tx(&mut tx, &envelope.envelope_id, settled_cost)
578 .await
579 {
580 Ok(_new_consumed) => {
581 match checkpoint {
583 Some(CheckpointLevel::Halt) => {
584 self.envelope_store
585 .set_status_in_tx(
586 &mut tx,
587 &envelope.envelope_id,
588 &consent::EnvelopeStatus::Expired,
589 )
590 .await?;
591 info!(
592 envelope_id = %envelope.envelope_id,
593 actor_id = %action.actor_id,
594 "checkpoint: HALT — envelope budget exhausted"
595 );
596 }
597 Some(CheckpointLevel::Report) => {
598 info!(
599 envelope_id = %envelope.envelope_id,
600 actor_id = %action.actor_id,
601 budget = envelope.budget,
602 consumed = envelope.budget_consumed + settled_cost,
603 "checkpoint: REPORT — summary logged"
604 );
605 }
606 None => {}
607 }
608 tx.commit().await?;
609 }
610 Err(err) => {
611 warn!(
612 error = %err,
613 "envelope budget consumption failed (event already committed)"
614 );
615 }
617 }
618 }
619
620 if let Some((ref target_actor_id, ref op)) = lifecycle_op {
622 let pool = self.state_store.pool();
623 match op {
624 punkgo_core::actor::LifecycleOp::Freeze { .. } => {
625 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
626 {
627 Ok(frozen_ids) => {
628 info!(
629 target = %target_actor_id,
630 cascade_count = frozen_ids.len(),
631 "lifecycle: freeze executed"
632 );
633 }
634 Err(err) => {
635 warn!(error = %err, "lifecycle: freeze failed (event already committed)");
636 }
637 }
638 }
639 punkgo_core::actor::LifecycleOp::Unfreeze => {
640 match lifecycle::execute_unfreeze(&self.actor_store, &pool, target_actor_id)
641 .await
642 {
643 Ok(()) => {
644 info!(target = %target_actor_id, "lifecycle: unfreeze executed");
645 }
646 Err(err) => {
647 warn!(error = %err, "lifecycle: unfreeze failed (event already committed)");
648 }
649 }
650 }
651 punkgo_core::actor::LifecycleOp::Terminate { .. } => {
652 match lifecycle::execute_freeze(&self.actor_store, &pool, target_actor_id).await
655 {
656 Ok(frozen_ids) => {
657 info!(
658 target = %target_actor_id,
659 cascade_count = frozen_ids.len(),
660 "lifecycle: terminate executed (cascade frozen)"
661 );
662 }
663 Err(err) => {
664 warn!(error = %err, "lifecycle: terminate failed (event already committed)");
665 }
666 }
667 }
668 punkgo_core::actor::LifecycleOp::UpdateEnergyShare { energy_share } => {
669 match lifecycle::execute_update_energy_share(
670 &self.actor_store,
671 &pool,
672 target_actor_id,
673 *energy_share,
674 )
675 .await
676 {
677 Ok(()) => {
678 info!(
679 target = %target_actor_id,
680 energy_share,
681 "lifecycle: energy_share updated"
682 );
683 }
684 Err(err) => {
685 warn!(error = %err, "lifecycle: update_energy_share failed (event already committed)");
686 }
687 }
688 }
689 }
690 }
691
692 Ok(SubmitReceipt {
693 event_id: event.id,
694 log_index: event.log_index,
695 event_hash: event.event_hash,
696 reserved_cost,
697 settled_cost,
698 artifact_hash,
699 })
700 }
701
702 async fn finalize_energy_and_event(
703 &self,
704 reservation: Option<&EnergyReservation>,
705 settled_cost: i64,
706 create_actor: Option<&CreateActorSpec>,
707 create_envelope: Option<&(String, EnvelopeSpec, Option<String>)>,
708 event: &mut EventRecord,
709 ) -> KernelResult<()> {
710 let pool = self.state_store.pool();
711 let mut tx = pool.begin().await?;
712
713 if let Some(res) = reservation {
715 self.energy_ledger
716 .settle_in_tx(&mut tx, &res.actor_id, res.reserved, settled_cost)
717 .await?;
718 }
719
720 if let Some(spec) = create_actor {
722 self.energy_ledger
723 .create_actor_in_tx(&mut tx, &spec.actor_id, spec.energy_balance)
724 .await?;
725 self.actor_store.create_in_tx(&mut tx, spec).await?;
726 info!(
727 created_actor = %spec.actor_id,
728 actor_type = spec.actor_type.as_str(),
729 energy_balance = spec.energy_balance,
730 "actor created in transaction"
731 );
732 }
733
734 if let Some((envelope_id, spec, parent_id)) = create_envelope {
736 self.envelope_store
737 .create_in_tx(&mut tx, envelope_id, spec, parent_id.as_deref())
738 .await?;
739 info!(
740 envelope_id = %envelope_id,
741 actor_id = %spec.actor_id,
742 grantor_id = %spec.grantor_id,
743 budget = spec.budget,
744 "envelope created in transaction"
745 );
746 }
747
748 self.event_log.append_in_tx(&mut tx, event).await?;
750
751 let log_index = event.log_index as u64;
753 self.audit_log
754 .append_leaf_in_tx(&mut tx, log_index, &event.event_hash)
755 .await
756 .map_err(|e| KernelError::Audit(e.to_string()))?;
757
758 tx.commit().await?;
763 info!(event_id = %event.id, log_index = event.log_index, "event committed");
764 Ok(())
765 }
766
767 fn validate_execute_payload(payload: &Value) -> KernelResult<String> {
772 Self::require_valid_oid(payload, "input_oid")?;
773 Self::require_valid_oid(payload, "output_oid")?;
774
775 if payload.get("exit_code").and_then(|v| v.as_i64()).is_none() {
776 return Err(KernelError::ExecutePayloadInvalid(
777 "missing or invalid exit_code (must be integer)".to_string(),
778 ));
779 }
780
781 let artifact_hash = Self::require_valid_oid(payload, "artifact_hash")?;
782 Ok(artifact_hash)
783 }
784
785 fn require_valid_oid(payload: &Value, field: &str) -> KernelResult<String> {
787 let val = payload
788 .get(field)
789 .and_then(|v| v.as_str())
790 .ok_or_else(|| KernelError::ExecutePayloadInvalid(format!("missing {field}")))?;
791
792 if !val.starts_with("sha256:") || val.len() != 71 {
793 return Err(KernelError::ExecutePayloadInvalid(format!(
794 "{field} must be sha256:<64 hex chars>, got: {val}"
795 )));
796 }
797 let hex_part = &val[7..];
798 if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
799 return Err(KernelError::ExecutePayloadInvalid(format!(
800 "{field} contains non-hex characters: {val}"
801 )));
802 }
803
804 Ok(val.to_string())
805 }
806
807 async fn read_query(&self, query: ReadQuery) -> KernelResult<Value> {
808 let requester = query.requester_id.as_deref().unwrap_or("anonymous");
813 check_read_access(requester, &query.kind)?;
814
815 match query.kind.as_str() {
816 "health" => Ok(json!({ "status": "ok" })),
817 "actor_energy" => {
818 let actor_id = query.actor_id.ok_or_else(|| {
819 KernelError::InvalidRequest("actor_id is required for actor_energy".to_string())
820 })?;
821 let (energy_balance, reserved_energy) =
822 self.energy_ledger.balance_view(&actor_id).await?;
823 Ok(json!({
824 "actor_id": actor_id,
825 "energy_balance": energy_balance,
826 "reserved_energy": reserved_energy
827 }))
828 }
829 "events" => {
830 let limit = query.limit.unwrap_or(20).clamp(1, 500);
831 let events = self
832 .event_log
833 .query(
834 query.actor_id.as_deref(),
835 query.before_index,
836 query.after_index,
837 limit,
838 )
839 .await?;
840 let next_cursor = events.last().map(|e| e.log_index);
843 let has_more = events.len() as i64 == limit;
844 Ok(json!({
845 "events": events,
846 "has_more": has_more,
847 "next_cursor": next_cursor
848 }))
849 }
850 "stats" => {
851 let event_count = self.event_log.count().await?;
852 Ok(json!({ "event_count": event_count }))
853 }
854 "snapshot" => {
855 let event_count = self.event_log.count().await?;
858 self.audit_log
859 .ensure_checkpoint(event_count as u64)
860 .await
861 .map_err(|e| KernelError::Audit(e.to_string()))?;
862 let cp = self
863 .audit_log
864 .latest_checkpoint()
865 .await
866 .map_err(|e| KernelError::Audit(e.to_string()))?;
867 Ok(json!({
868 "event_count": event_count,
869 "snapshot_hash": cp.root_hash,
870 "generated_at": cp.created_at
871 }))
872 }
873 "paths" => {
874 let paths = self.state_store.paths();
875 Ok(json!({
876 "root": paths.root.display().to_string(),
877 "workspace_root": paths.workspace_root.display().to_string(),
878 "quarantine_root": paths.quarantine_root.display().to_string(),
879 "db_path": paths.db_path.display().to_string()
880 }))
881 }
882 "audit_checkpoint" => {
883 let event_count = self.event_log.count().await?;
884 self.audit_log
885 .ensure_checkpoint(event_count as u64)
886 .await
887 .map_err(|e| KernelError::Audit(e.to_string()))?;
888 let cp = self
889 .audit_log
890 .latest_checkpoint()
891 .await
892 .map_err(|e| KernelError::Audit(e.to_string()))?;
893 Ok(serde_json::to_value(cp)?)
894 }
895 "audit_inclusion_proof" => {
896 let log_index = query.log_index.ok_or_else(|| {
897 KernelError::InvalidRequest(
898 "log_index is required for audit_inclusion_proof".to_string(),
899 )
900 })? as u64;
901 let tree_size = match query.tree_size {
902 Some(s) => s as u64,
903 None => {
904 let event_count = self.event_log.count().await? as u64;
906 self.audit_log
907 .ensure_checkpoint(event_count)
908 .await
909 .map_err(|e| KernelError::Audit(e.to_string()))?;
910 self.audit_log
911 .tree_size()
912 .await
913 .map_err(|e| KernelError::Audit(e.to_string()))?
914 }
915 };
916 let proof = self
917 .audit_log
918 .inclusion_proof(log_index, tree_size)
919 .await
920 .map_err(|e| KernelError::Audit(e.to_string()))?;
921 Ok(json!({
922 "log_index": log_index,
923 "tree_size": tree_size,
924 "proof": proof
925 }))
926 }
927 "audit_consistency_proof" => {
928 let old_size = query.old_size.ok_or_else(|| {
929 KernelError::InvalidRequest(
930 "old_size is required for audit_consistency_proof".to_string(),
931 )
932 })? as u64;
933 let new_size = query.tree_size.ok_or_else(|| {
934 KernelError::InvalidRequest(
935 "tree_size is required for audit_consistency_proof".to_string(),
936 )
937 })? as u64;
938 let proof = self
939 .audit_log
940 .consistency_proof(old_size, new_size)
941 .await
942 .map_err(|e| KernelError::Audit(e.to_string()))?;
943 Ok(json!({
944 "old_size": old_size,
945 "new_size": new_size,
946 "proof": proof
947 }))
948 }
949 "stellar_info" => {
951 let config = &self.stellar_config;
952 Ok(json!({
953 "gpu_model": config.gpu_model,
954 "cpu_model": config.cpu_model,
955 "int8_tops": config.int8_tops,
956 "energy_per_tick": config.effective_energy_per_tick(),
957 "tick_interval_ms": config.tick_interval_ms,
958 "luminosity_source": config.luminosity_source
959 }))
960 }
961 "envelope_info" => {
963 let actor_id = query.actor_id.ok_or_else(|| {
964 KernelError::InvalidRequest(
965 "actor_id is required for envelope_info".to_string(),
966 )
967 })?;
968 let envelope = self.envelope_store.get_active_for_actor(&actor_id).await?;
969 match envelope {
970 Some(record) => Ok(serde_json::to_value(record)?),
971 None => Ok(json!({ "actor_id": actor_id, "envelope": null })),
972 }
973 }
974 "actor_info" => {
976 let actor_id = query.actor_id.ok_or_else(|| {
977 KernelError::InvalidRequest("actor_id is required for actor_info".to_string())
978 })?;
979 let actor = self.actor_store.get(&actor_id).await?;
980 match actor {
981 Some(record) => Ok(serde_json::to_value(record)?),
982 None => Err(KernelError::ActorNotFound(actor_id)),
983 }
984 }
985 "hold_info" => {
987 let hold_id = query.hold_id.ok_or_else(|| {
988 KernelError::InvalidRequest("hold_id is required for hold_info".to_string())
989 })?;
990 let hold = self.envelope_store.get_hold_request(&hold_id).await?;
991 match hold {
992 Some(record) => Ok(record),
993 None => Err(KernelError::InvalidRequest(format!(
994 "hold_request not found: {hold_id}"
995 ))),
996 }
997 }
998 "holds_pending" => {
1000 let holds = self
1001 .envelope_store
1002 .list_pending_holds(query.actor_id.as_deref())
1003 .await?;
1004 Ok(json!({ "holds": holds }))
1005 }
1006 other => Err(KernelError::InvalidRequest(format!(
1007 "unsupported read query kind: {other}"
1008 ))),
1009 }
1010 }
1011}
1012
1013fn now_millis_string() -> String {
1014 let now = std::time::SystemTime::now()
1015 .duration_since(std::time::UNIX_EPOCH)
1016 .unwrap_or_default();
1017 now.as_millis().to_string()
1018}
1019
1020fn now_millis_u64() -> u64 {
1021 std::time::SystemTime::now()
1022 .duration_since(std::time::UNIX_EPOCH)
1023 .unwrap_or_default()
1024 .as_millis() as u64
1025}
1026
1027fn parse_policy_version(action: &Action) -> Option<String> {
1030 if !matches!(action.action_type, ActionType::Create) || action.target != "system/policy" {
1031 return None;
1032 }
1033 action
1034 .payload
1035 .get("version")
1036 .and_then(|v| v.as_str())
1037 .map(|s| s.to_string())
1038}
1039
1040async fn parse_create_actor_spec(
1047 action: &Action,
1048 actor_store: &ActorStore,
1049) -> KernelResult<Option<CreateActorSpec>> {
1050 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/actor" {
1051 return Ok(None);
1052 }
1053
1054 let payload = action.payload.as_object().ok_or_else(|| {
1055 KernelError::InvalidRequest("seed actor payload must be an object".to_string())
1056 })?;
1057
1058 let explicit_id = payload.get("actor_id").and_then(Value::as_str);
1060 let purpose = payload
1061 .get("purpose")
1062 .and_then(Value::as_str)
1063 .unwrap_or("legacy");
1064 let energy_balance = payload
1065 .get("energy_balance")
1066 .and_then(Value::as_i64)
1067 .unwrap_or(1000);
1068
1069 let actor_type_str = payload
1071 .get("actor_type")
1072 .and_then(Value::as_str)
1073 .unwrap_or("agent");
1074 let actor_type = ActorType::parse(actor_type_str).ok_or_else(|| {
1075 KernelError::InvalidRequest(format!("invalid actor_type: {actor_type_str}"))
1076 })?;
1077
1078 let actor_id = if let Some(id) = explicit_id {
1080 id.to_string()
1081 } else {
1082 if purpose == "legacy" {
1083 return Err(KernelError::InvalidRequest(
1084 "actor_id or purpose is required to create an actor".to_string(),
1085 ));
1086 }
1087 let seq = actor_store.next_sequence(&action.actor_id, purpose).await?;
1088 derive_agent_id(&action.actor_id, purpose, seq)
1089 };
1090
1091 let creator_record = actor_store.get(&action.actor_id).await?;
1093 let (creator_type, creator_lineage) = match &creator_record {
1094 Some(record) => (record.actor_type.clone(), record.lineage.clone()),
1095 None => (ActorType::Human, vec![]),
1097 };
1098
1099 if creator_type == ActorType::Agent && actor_type == ActorType::Agent {
1101 return Err(KernelError::PolicyViolation(
1102 "agents cannot create agents — creation right belongs to humans (PIP-001 §5)"
1103 .to_string(),
1104 ));
1105 }
1106
1107 let lineage = build_lineage(&creator_type, &action.actor_id, &creator_lineage);
1108
1109 let writable_targets: Vec<WritableTarget> = if let Some(targets_val) =
1111 payload.get("writable_targets")
1112 {
1113 serde_json::from_value(targets_val.clone())
1114 .map_err(|e| KernelError::InvalidRequest(format!("invalid writable_targets: {e}")))?
1115 } else {
1116 vec![]
1117 };
1118
1119 if !writable_targets.is_empty()
1121 && let Some(ref creator) = creator_record
1122 {
1123 validate_child_targets(creator, &writable_targets)?;
1124 }
1125
1126 let energy_share = payload
1128 .get("energy_share")
1129 .and_then(Value::as_f64)
1130 .unwrap_or(0.0);
1131
1132 let reduction_policy = payload
1133 .get("reduction_policy")
1134 .and_then(Value::as_str)
1135 .unwrap_or("none")
1136 .to_string();
1137
1138 Ok(Some(CreateActorSpec {
1139 actor_id,
1140 actor_type,
1141 creator_id: action.actor_id.clone(),
1142 lineage,
1143 purpose: Some(purpose.to_string()),
1144 writable_targets,
1145 energy_balance,
1146 energy_share,
1147 reduction_policy,
1148 }))
1149}
1150
1151async fn parse_create_envelope_spec(
1160 action: &Action,
1161 envelope_store: &EnvelopeStore,
1162) -> KernelResult<Option<(String, EnvelopeSpec, Option<String>)>> {
1163 if !matches!(action.action_type, ActionType::Create) || action.target != "ledger/envelope" {
1164 return Ok(None);
1165 }
1166
1167 let payload = action.payload.as_object().ok_or_else(|| {
1168 KernelError::InvalidRequest("envelope payload must be an object".to_string())
1169 })?;
1170
1171 let actor_id = payload
1172 .get("actor_id")
1173 .and_then(Value::as_str)
1174 .ok_or_else(|| {
1175 KernelError::InvalidRequest("actor_id is required to create an envelope".to_string())
1176 })?
1177 .to_string();
1178
1179 let budget = payload
1180 .get("budget")
1181 .and_then(Value::as_i64)
1182 .ok_or_else(|| {
1183 KernelError::InvalidRequest("budget is required to create an envelope".to_string())
1184 })?;
1185
1186 if budget <= 0 {
1187 return Err(KernelError::InvalidRequest(
1188 "envelope budget must be positive".to_string(),
1189 ));
1190 }
1191
1192 let targets: Vec<String> = if let Some(targets_val) = payload.get("targets") {
1193 serde_json::from_value(targets_val.clone())
1194 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope targets: {e}")))?
1195 } else {
1196 return Err(KernelError::InvalidRequest(
1197 "targets are required to create an envelope".to_string(),
1198 ));
1199 };
1200
1201 let actions: Vec<String> = if let Some(actions_val) = payload.get("actions") {
1202 serde_json::from_value(actions_val.clone())
1203 .map_err(|e| KernelError::InvalidRequest(format!("invalid envelope actions: {e}")))?
1204 } else {
1205 return Err(KernelError::InvalidRequest(
1206 "actions are required to create an envelope".to_string(),
1207 ));
1208 };
1209
1210 let duration_secs = payload.get("duration_secs").and_then(Value::as_i64);
1211 let report_every = payload.get("report_every").and_then(Value::as_i64);
1212 let hold_timeout_secs = payload.get("hold_timeout_secs").and_then(Value::as_i64);
1213
1214 let hold_on: Vec<HoldRule> = if let Some(hold_on_val) = payload.get("hold_on") {
1216 serde_json::from_value(hold_on_val.clone())
1217 .map_err(|e| KernelError::InvalidRequest(format!("invalid hold_on rules: {e}")))?
1218 } else {
1219 vec![]
1220 };
1221
1222 let spec = EnvelopeSpec {
1223 actor_id,
1224 grantor_id: action.actor_id.clone(),
1225 budget,
1226 targets,
1227 actions,
1228 duration_secs,
1229 report_every,
1230 hold_on,
1231 hold_timeout_secs,
1232 };
1233
1234 let parent_envelope_id = if let Some(grantor_envelope) = envelope_store
1236 .get_active_for_actor(&action.actor_id)
1237 .await?
1238 {
1239 consent::validate_envelope_reduction(&grantor_envelope, &spec)?;
1240 Some(grantor_envelope.envelope_id)
1241 } else {
1242 None
1243 };
1244
1245 let envelope_id = Uuid::new_v4().to_string();
1246
1247 Ok(Some((envelope_id, spec, parent_envelope_id)))
1248}
1249
1250fn parse_hold_response(target: &str, payload: &Value) -> Option<(String, String, Option<String>)> {
1258 let rest = target.strip_prefix("ledger/hold/")?;
1260 if rest.is_empty() || rest.contains('/') {
1262 return None;
1263 }
1264 let hold_id = rest.to_string();
1265
1266 let decision = payload.get("decision").and_then(Value::as_str)?.to_string();
1267
1268 if decision != "approve" && decision != "reject" {
1270 return None;
1271 }
1272
1273 let instruction = payload
1274 .get("instruction")
1275 .and_then(Value::as_str)
1276 .map(|s| s.to_string());
1277
1278 Some((hold_id, decision, instruction))
1279}
1280
1281impl Kernel {
1282 async fn execute_hold_response(
1292 &self,
1293 human_action: &Action,
1294 hold_id: &str,
1295 decision: &str,
1296 instruction: Option<&str>,
1297 ) -> KernelResult<SubmitReceipt> {
1298 let caller = self
1300 .actor_store
1301 .get(&human_action.actor_id)
1302 .await?
1303 .ok_or_else(|| KernelError::ActorNotFound(human_action.actor_id.clone()))?;
1304
1305 if caller.actor_type != punkgo_core::actor::ActorType::Human {
1306 return Err(KernelError::PolicyViolation(format!(
1307 "only Human actors can resolve holds; caller {} is {:?}",
1308 human_action.actor_id, caller.actor_type
1309 )));
1310 }
1311
1312 let hold_record = self
1314 .envelope_store
1315 .get_hold_request(hold_id)
1316 .await?
1317 .ok_or_else(|| {
1318 KernelError::PolicyViolation(format!("hold_request not found: {hold_id}"))
1319 })?;
1320
1321 let _envelope_id = hold_record
1322 .get("envelope_id")
1323 .and_then(Value::as_str)
1324 .ok_or_else(|| {
1325 KernelError::PolicyViolation("hold_request missing envelope_id".to_string())
1326 })?
1327 .to_string();
1328
1329 let agent_id = hold_record
1330 .get("agent_id")
1331 .and_then(Value::as_str)
1332 .ok_or_else(|| {
1333 KernelError::PolicyViolation("hold_request missing agent_id".to_string())
1334 })?
1335 .to_string();
1336
1337 let pending_payload = hold_record
1338 .get("pending_payload")
1339 .cloned()
1340 .unwrap_or(Value::Null);
1341
1342 let trigger_target = hold_record
1343 .get("trigger_target")
1344 .and_then(Value::as_str)
1345 .unwrap_or("")
1346 .to_string();
1347
1348 let trigger_action = hold_record
1349 .get("trigger_action")
1350 .and_then(Value::as_str)
1351 .unwrap_or("")
1352 .to_string();
1353
1354 let status = hold_record
1355 .get("status")
1356 .and_then(Value::as_str)
1357 .unwrap_or("");
1358 if status != "pending" {
1359 return Err(KernelError::PolicyViolation(format!(
1360 "hold_request {hold_id} is already resolved (status={status})"
1361 )));
1362 }
1363
1364 let envelope_id_str = hold_record
1367 .get("envelope_id")
1368 .and_then(Value::as_str)
1369 .unwrap_or("")
1370 .to_string();
1371 if let Ok(Some(env)) = self.envelope_store.get(&envelope_id_str).await {
1372 if let Some(timeout_secs) = env.hold_timeout_secs {
1373 if timeout_secs > 0 {
1374 let triggered_at: u64 = hold_record
1375 .get("triggered_at")
1376 .and_then(|v| v.as_str())
1377 .and_then(|s| s.parse().ok())
1378 .unwrap_or(0);
1379 let now = now_millis_u64();
1380 if now.saturating_sub(triggered_at) > (timeout_secs as u64) * 1000 {
1381 self.expire_timed_out_holds(&envelope_id_str, timeout_secs)
1383 .await?;
1384 return Err(KernelError::PolicyViolation(format!(
1385 "hold_request {hold_id} has timed out and was auto-rejected"
1386 )));
1387 }
1388 }
1389 }
1390 }
1391
1392 let response_payload = json!({
1394 "hold_id": hold_id,
1395 "agent_id": &agent_id,
1396 "decision": decision,
1397 "instruction": instruction,
1398 "trigger": {
1399 "target": &trigger_target,
1400 "action_type": &trigger_action
1401 },
1402 "resolved_by": &human_action.actor_id,
1403 "resolved_at": now_millis_string()
1404 });
1405
1406 let response_action = Action {
1407 actor_id: human_action.actor_id.clone(),
1408 action_type: ActionType::Mutate,
1409 target: format!("ledger/hold/{hold_id}"),
1410 payload: response_payload.clone(),
1411 timestamp: None,
1412 };
1413
1414 let pool = self.state_store.pool();
1417 let mut tx = pool.begin().await?;
1418
1419 self.envelope_store
1420 .resolve_hold_request_in_tx(&mut tx, hold_id, decision, instruction)
1421 .await?;
1422
1423 let hold_reserved_cost = pending_payload
1426 .get("reserved_cost")
1427 .and_then(|v| v.as_i64())
1428 .unwrap_or(0);
1429
1430 let commitment_cost = if decision == "reject" && hold_reserved_cost > 0 {
1431 let cost = ((hold_reserved_cost as f64) * 0.2).ceil() as i64;
1432 self.energy_ledger
1433 .settle_in_tx(&mut tx, &agent_id, hold_reserved_cost, cost)
1434 .await?;
1435 cost
1436 } else {
1437 0
1438 };
1439
1440 let mut response_event = EventRecord {
1441 id: Uuid::new_v4().to_string(),
1442 log_index: 0,
1443 event_hash: String::new(),
1444 actor_id: human_action.actor_id.clone(),
1445 action_type: "hold_response".to_string(),
1446 target: format!("ledger/hold/{hold_id}"),
1447 payload: response_payload.clone(),
1448 payload_hash: payload_hash_hex(&response_action)?,
1449 artifact_hash: None,
1450 reserved_energy: hold_reserved_cost,
1451 settled_energy: commitment_cost,
1452 timestamp: now_millis_string(),
1453 };
1454 self.event_log
1455 .append_in_tx(&mut tx, &mut response_event)
1456 .await?;
1457
1458 let log_index = response_event.log_index as u64;
1460 self.audit_log
1461 .append_leaf_in_tx(&mut tx, log_index, &response_event.event_hash)
1462 .await
1463 .map_err(|e| KernelError::Audit(e.to_string()))?;
1464 tx.commit().await?;
1466
1467 info!(
1468 hold_id = %hold_id,
1469 decision = %decision,
1470 agent_id = %agent_id,
1471 resolved_by = %human_action.actor_id,
1472 "PIP-001 §11d: hold resolved"
1473 );
1474
1475 if decision == "approve" {
1477 let orig_target = pending_payload
1479 .get("target")
1480 .and_then(Value::as_str)
1481 .unwrap_or(&trigger_target)
1482 .to_string();
1483 let orig_action_type_str = pending_payload
1484 .get("action_type")
1485 .and_then(Value::as_str)
1486 .unwrap_or(&trigger_action)
1487 .to_string();
1488 let mut orig_payload = pending_payload
1489 .get("payload")
1490 .cloned()
1491 .unwrap_or(Value::Null);
1492
1493 if let Some(obj) = orig_payload.as_object_mut() {
1500 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1501 obj.insert(
1502 "_hold_reserved_cost".to_string(),
1503 Value::Number(hold_reserved_cost.into()),
1504 );
1505 if let Some(instr) = instruction {
1507 obj.insert(
1508 "_hold_instruction".to_string(),
1509 Value::String(instr.to_string()),
1510 );
1511 }
1512 } else {
1513 let mut obj = serde_json::Map::new();
1515 obj.insert("_hold_approved".to_string(), Value::Bool(true));
1516 obj.insert(
1517 "_hold_reserved_cost".to_string(),
1518 Value::Number(hold_reserved_cost.into()),
1519 );
1520 if let Some(instr) = instruction {
1521 obj.insert(
1522 "_hold_instruction".to_string(),
1523 Value::String(instr.to_string()),
1524 );
1525 }
1526 orig_payload = Value::Object(obj);
1527 }
1528
1529 let orig_action_type = match orig_action_type_str.as_str() {
1530 "observe" => ActionType::Observe,
1531 "create" => ActionType::Create,
1532 "mutate" => ActionType::Mutate,
1533 _ => ActionType::Execute,
1534 };
1535
1536 let pending_action = Action {
1537 actor_id: agent_id.clone(),
1538 action_type: orig_action_type,
1539 target: orig_target,
1540 payload: orig_payload,
1541 timestamp: None,
1542 };
1543
1544 info!(
1545 hold_id = %hold_id,
1546 agent_id = %agent_id,
1547 "PIP-001 §11d: re-executing approved pending action"
1548 );
1549
1550 return Box::pin(self.submit_action(pending_action)).await;
1554 }
1555
1556 Ok(SubmitReceipt {
1558 event_id: response_event.id,
1559 log_index: response_event.log_index,
1560 event_hash: response_event.event_hash,
1561 reserved_cost: hold_reserved_cost,
1562 settled_cost: commitment_cost,
1563 artifact_hash: None,
1564 })
1565 }
1566
1567 async fn expire_timed_out_holds(
1574 &self,
1575 envelope_id: &str,
1576 hold_timeout_secs: i64,
1577 ) -> KernelResult<()> {
1578 let holds = self
1579 .envelope_store
1580 .list_pending_holds_for_envelope(envelope_id)
1581 .await?;
1582 let now = now_millis_u64();
1583
1584 for hold in holds {
1585 let triggered_at: u64 = hold
1586 .get("triggered_at")
1587 .and_then(|v| v.as_str())
1588 .and_then(|s| s.parse().ok())
1589 .unwrap_or(0);
1590
1591 if now.saturating_sub(triggered_at) <= (hold_timeout_secs as u64) * 1000 {
1592 continue;
1593 }
1594
1595 let hold_id = hold.get("hold_id").and_then(|v| v.as_str()).unwrap_or("");
1597 let agent_id = hold.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
1598 let pending_payload = hold.get("pending_payload").cloned().unwrap_or(Value::Null);
1599 let reserved_cost = pending_payload
1600 .get("reserved_cost")
1601 .and_then(|v| v.as_i64())
1602 .unwrap_or(0);
1603
1604 let commitment_cost = if reserved_cost > 0 {
1605 ((reserved_cost as f64) * 0.2).ceil() as i64
1606 } else {
1607 0
1608 };
1609
1610 let timeout_payload = json!({
1611 "hold_id": hold_id,
1612 "agent_id": agent_id,
1613 "decision": "timed_out",
1614 "reserved_cost": reserved_cost,
1615 "commitment_cost": commitment_cost,
1616 "triggered_at": triggered_at.to_string(),
1617 "expired_at": now.to_string()
1618 });
1619 let timeout_action = Action {
1620 actor_id: agent_id.to_string(),
1621 action_type: ActionType::Mutate,
1622 target: format!("ledger/hold/{hold_id}"),
1623 payload: timeout_payload.clone(),
1624 timestamp: None,
1625 };
1626 let mut timeout_event = EventRecord {
1627 id: Uuid::new_v4().to_string(),
1628 log_index: 0,
1629 event_hash: String::new(),
1630 actor_id: agent_id.to_string(),
1631 action_type: "hold_timeout".to_string(),
1632 target: format!("ledger/hold/{hold_id}"),
1633 payload: timeout_payload,
1634 payload_hash: payload_hash_hex(&timeout_action)?,
1635 artifact_hash: None,
1636 reserved_energy: reserved_cost,
1637 settled_energy: commitment_cost,
1638 timestamp: now_millis_string(),
1639 };
1640
1641 let pool = self.state_store.pool();
1642 let mut tx = pool.begin().await?;
1643 self.envelope_store
1644 .resolve_hold_request_in_tx(&mut tx, hold_id, "timed_out", None)
1645 .await?;
1646 if reserved_cost > 0 {
1647 self.energy_ledger
1648 .settle_in_tx(&mut tx, agent_id, reserved_cost, commitment_cost)
1649 .await?;
1650 }
1651 self.event_log
1652 .append_in_tx(&mut tx, &mut timeout_event)
1653 .await?;
1654
1655 let t_log_index = timeout_event.log_index as u64;
1657 self.audit_log
1658 .append_leaf_in_tx(&mut tx, t_log_index, &timeout_event.event_hash)
1659 .await
1660 .map_err(|e| KernelError::Audit(e.to_string()))?;
1661 tx.commit().await?;
1663
1664 info!(
1665 hold_id = %hold_id,
1666 agent_id = %agent_id,
1667 commitment_cost = commitment_cost,
1668 "PIP-001 §11e: hold auto-expired (timeout)"
1669 );
1670 }
1671 Ok(())
1672 }
1673}