1use super::artifact::{ArtifactStore, ArtifactType, PutArtifactRequest};
67use super::enforcement::{
68 EnforcementMiddleware, EnforcementResult, ExecutionUsage, LongRunningExecutionPolicy,
69};
70use super::error::ExecutionError;
71use super::execution_model::Execution;
72use super::execution_state::{ExecutionState, StepState, WaitReason};
73use super::ids::{ArtifactId, ExecutionId, SpawnMode, StepId, StepType};
74use super::persistence::{ExecutionSnapshot, StateStore};
75use super::reducer::{reduce, ExecutionAction, ReducerError};
76use crate::context::TenantContext;
77use crate::graph::{CompiledGraph, NodeState};
78use crate::inbox::{ControlAction, InboxMessage, InboxStore};
79use crate::signal::SignalBus;
80use crate::streaming::{EventEmitter, ProtectedEventEmitter, StreamEvent};
81use std::sync::Arc;
82use std::time::Instant;
83use tokio_util::sync::CancellationToken;
84
85#[derive(Debug, Clone)]
87enum InboxAction {
88 Continue,
90 Pause,
92 Cancel(String),
94}
95
96pub struct ExecutionKernel {
109 execution: Execution,
111 tenant_context: TenantContext,
113 emitter: EventEmitter,
115 protected_emitter: Option<ProtectedEventEmitter>,
121 cancellation_token: CancellationToken,
123 inbox: Option<Arc<dyn InboxStore>>,
125 state_store: Option<Arc<dyn StateStore>>,
127 signal_bus: Option<Arc<dyn SignalBus>>,
129 artifact_store: Option<Arc<dyn ArtifactStore>>,
131 enforcement: Arc<EnforcementMiddleware>,
136 long_running_policy: LongRunningExecutionPolicy,
141 usage: Option<Arc<ExecutionUsage>>,
143 spawn_mode: Option<SpawnMode>,
152 parent_execution_id: Option<ExecutionId>,
156}
157
158impl ExecutionKernel {
159 pub fn new(tenant_context: TenantContext) -> Self {
168 let mut execution = Execution::new();
169 execution.tenant_id = Some(tenant_context.tenant_id.clone());
170 Self {
171 execution,
172 tenant_context,
173 emitter: EventEmitter::new(),
174 protected_emitter: None,
175 cancellation_token: CancellationToken::new(),
176 inbox: None,
177 state_store: None,
178 signal_bus: None,
179 artifact_store: None,
180 enforcement: Arc::new(EnforcementMiddleware::new()),
181 long_running_policy: LongRunningExecutionPolicy::standard(),
182 usage: None,
183 spawn_mode: None,
184 parent_execution_id: None,
185 }
186 }
187
188 pub fn with_execution(execution: Execution, tenant_context: TenantContext) -> Self {
194 Self {
196 execution,
197 tenant_context,
198 emitter: EventEmitter::new(),
199 protected_emitter: None,
200 cancellation_token: CancellationToken::new(),
201 inbox: None,
202 state_store: None,
203 signal_bus: None,
204 artifact_store: None,
205 enforcement: Arc::new(EnforcementMiddleware::new()),
206 long_running_policy: LongRunningExecutionPolicy::standard(),
207 usage: None,
208 spawn_mode: None,
209 parent_execution_id: None,
210 }
211 }
212
213 pub fn with_protected_emitter(mut self, emitter: ProtectedEventEmitter) -> Self {
230 self.protected_emitter = Some(emitter);
231 self
232 }
233
234 pub fn with_inbox(mut self, inbox: Arc<dyn InboxStore>) -> Self {
239 self.inbox = Some(inbox);
240 self
241 }
242
243 pub fn inbox(&self) -> Option<&Arc<dyn InboxStore>> {
245 self.inbox.as_ref()
246 }
247
248 pub fn with_state_store(mut self, state_store: Arc<dyn StateStore>) -> Self {
250 self.state_store = Some(state_store);
251 self
252 }
253
254 pub fn state_store(&self) -> Option<&Arc<dyn StateStore>> {
256 self.state_store.as_ref()
257 }
258
259 pub fn with_signal_bus(mut self, signal_bus: Arc<dyn SignalBus>) -> Self {
261 self.signal_bus = Some(signal_bus);
262 self
263 }
264
265 pub fn signal_bus(&self) -> Option<&Arc<dyn SignalBus>> {
267 self.signal_bus.as_ref()
268 }
269
270 pub fn with_artifact_store(mut self, store: Arc<dyn ArtifactStore>) -> Self {
275 self.artifact_store = Some(store);
276 self
277 }
278
279 pub fn artifact_store(&self) -> Option<&Arc<dyn ArtifactStore>> {
281 self.artifact_store.as_ref()
282 }
283
284 pub fn with_enforcement(mut self, enforcement: Arc<EnforcementMiddleware>) -> Self {
289 self.enforcement = enforcement;
290 self
291 }
292
293 pub fn enforcement(&self) -> &Arc<EnforcementMiddleware> {
295 &self.enforcement
296 }
297
298 pub fn with_long_running_policy(mut self, policy: LongRunningExecutionPolicy) -> Self {
303 self.long_running_policy = policy;
304 self
305 }
306
307 pub fn long_running_policy(&self) -> &LongRunningExecutionPolicy {
309 &self.long_running_policy
310 }
311
312 pub fn with_spawn_mode(mut self, spawn_mode: SpawnMode) -> Self {
321 self.spawn_mode = Some(spawn_mode);
322 self
323 }
324
325 pub fn spawn_mode(&self) -> Option<&SpawnMode> {
327 self.spawn_mode.as_ref()
328 }
329
330 pub fn with_parent_execution_id(mut self, parent_id: ExecutionId) -> Self {
335 self.parent_execution_id = Some(parent_id);
336 self
337 }
338
339 pub fn parent_execution_id(&self) -> Option<&ExecutionId> {
341 self.parent_execution_id.as_ref()
342 }
343
344 pub fn usage_snapshot(&self) -> Option<super::enforcement::UsageSnapshot> {
346 self.usage
347 .as_ref()
348 .map(|u| super::enforcement::UsageSnapshot::from(u.as_ref()))
349 }
350
351 pub async fn register_for_enforcement(&mut self) -> Arc<ExecutionUsage> {
356 let usage = self
357 .enforcement
358 .register_execution(
359 self.execution.id.clone(),
360 self.tenant_context.tenant_id.clone(),
361 )
362 .await;
363 self.usage = Some(Arc::clone(&usage));
364 usage
365 }
366
367 pub async fn unregister_from_enforcement(&self) {
371 self.enforcement
372 .unregister_execution(&self.execution.id)
373 .await;
374 }
375
376 pub async fn check_limits_before_step(&self) -> Result<(), ExecutionError> {
384 let basic_result = self
386 .enforcement
387 .check_all_limits(&self.execution.id, &self.tenant_context.limits)
388 .await;
389
390 if let EnforcementResult::Blocked(violation) = basic_result {
391 return Err(violation.to_error());
392 }
393
394 let long_running_result = self
396 .enforcement
397 .check_long_running_limits(&self.execution.id, &self.long_running_policy)
398 .await;
399
400 if let EnforcementResult::Blocked(violation) = long_running_result {
401 return Err(violation.to_error());
402 }
403
404 if self.enforcement.emit_warning_events_enabled() {
406 if let Some(warning) = match (&basic_result, &long_running_result) {
407 (EnforcementResult::Warning(w), _) => Some(w),
408 (_, EnforcementResult::Warning(w)) => Some(w),
409 _ => None,
410 } {
411 tracing::warn!(
412 execution_id = %self.execution.id,
413 warning_type = ?warning.warning_type,
414 usage_percent = warning.usage_percent,
415 message = %warning.message,
416 "Enforcement warning"
417 );
418 self.emitter.emit(StreamEvent::policy_decision_warn(
419 &self.execution.id,
420 None,
421 "enforcement",
422 warning.message.clone(),
423 ));
424 }
425 }
426 Ok(())
427 }
428
429 pub async fn record_step_completed(&self) {
431 self.enforcement.record_step(&self.execution.id).await;
432 }
433
434 pub async fn record_token_usage(&self, input_tokens: u32, output_tokens: u32) {
436 self.enforcement
437 .record_tokens(&self.execution.id, input_tokens, output_tokens)
438 .await;
439 }
440
441 pub async fn record_cost(&self, cost_usd: f64) {
443 self.enforcement
444 .record_cost(&self.execution.id, cost_usd)
445 .await;
446 }
447
448 pub async fn record_discovered_step(&self) {
450 self.enforcement
451 .record_discovered_step(&self.execution.id)
452 .await;
453 }
454
455 pub async fn push_discovery_depth(&self) {
457 self.enforcement
458 .push_discovery_depth(&self.execution.id)
459 .await;
460 }
461
462 pub async fn pop_discovery_depth(&self) {
464 self.enforcement
465 .pop_discovery_depth(&self.execution.id)
466 .await;
467 }
468
469 pub async fn store_artifact(
485 &self,
486 step_id: &StepId,
487 name: impl Into<String>,
488 artifact_type: ArtifactType,
489 content: Vec<u8>,
490 ) -> Option<ArtifactId> {
491 let store = self.artifact_store.as_ref()?;
492
493 let request = PutArtifactRequest::new(
494 self.execution.id.clone(),
495 step_id.clone(),
496 name,
497 artifact_type,
498 content,
499 );
500
501 match store.put(request).await {
502 Ok(response) => {
503 let artifact_type_str = format!("{:?}", artifact_type);
505 self.emitter.emit(StreamEvent::artifact_created(
506 &self.execution.id,
507 step_id,
508 &response.artifact_id,
509 artifact_type_str,
510 ));
511
512 tracing::debug!(
513 execution_id = %self.execution.id,
514 step_id = %step_id,
515 artifact_id = %response.artifact_id,
516 "Artifact stored"
517 );
518
519 Some(response.artifact_id)
520 }
521 Err(e) => {
522 tracing::warn!(
523 execution_id = %self.execution.id,
524 step_id = %step_id,
525 error = %e,
526 "Failed to store artifact"
527 );
528 None
529 }
530 }
531 }
532
533 pub async fn store_text_artifact(
535 &self,
536 step_id: &StepId,
537 name: impl Into<String>,
538 content: impl Into<String>,
539 ) -> Option<ArtifactId> {
540 self.store_artifact(
541 step_id,
542 name,
543 ArtifactType::Text,
544 content.into().into_bytes(),
545 )
546 .await
547 }
548
549 pub async fn store_json_artifact(
551 &self,
552 step_id: &StepId,
553 name: impl Into<String>,
554 value: &serde_json::Value,
555 ) -> Option<ArtifactId> {
556 let content = serde_json::to_vec_pretty(value).ok()?;
557 self.store_artifact(step_id, name, ArtifactType::Json, content)
558 .await
559 }
560
561 pub fn tenant_context(&self) -> &TenantContext {
563 &self.tenant_context
564 }
565
566 pub fn execution_id(&self) -> &ExecutionId {
568 &self.execution.id
569 }
570
571 pub fn state(&self) -> ExecutionState {
573 self.execution.state
574 }
575
576 pub fn emitter(&self) -> &EventEmitter {
578 &self.emitter
579 }
580
581 pub fn execution(&self) -> &Execution {
583 &self.execution
584 }
585
586 pub fn is_cancelled(&self) -> bool {
590 self.cancellation_token.is_cancelled()
591 }
592
593 pub fn cancel(&self, _reason: impl Into<String>) {
598 self.cancellation_token.cancel();
599 }
601
602 pub fn child_cancellation_token(&self) -> CancellationToken {
607 self.cancellation_token.child_token()
608 }
609
610 pub fn cancellation_token(&self) -> &CancellationToken {
620 &self.cancellation_token
621 }
622
623 pub fn dispatch(&mut self, action: ExecutionAction) -> Result<(), ReducerError> {
627 reduce(&mut self.execution, action.clone())?;
629
630 self.emit_event_for_action(&action);
632 self.persist_snapshot_best_effort();
634 self.emit_signal_best_effort(&action);
636
637 Ok(())
638 }
639
640 pub fn start(&mut self) -> Result<(), ReducerError> {
642 self.dispatch(ExecutionAction::Start)
643 }
644
645 pub fn begin_step(
647 &mut self,
648 step_type: StepType,
649 name: impl Into<String>,
650 parent_step_id: Option<StepId>,
651 ) -> Result<StepId, ReducerError> {
652 let step_id = StepId::new();
653 self.dispatch(ExecutionAction::StepStarted {
654 step_id: step_id.clone(),
655 parent_step_id,
656 step_type,
657 name: name.into(),
658 source: None,
659 })?;
660 Ok(step_id)
661 }
662
663 pub fn complete_step(
665 &mut self,
666 step_id: StepId,
667 output: Option<String>,
668 duration_ms: u64,
669 ) -> Result<(), ReducerError> {
670 self.dispatch(ExecutionAction::StepCompleted {
671 step_id,
672 output,
673 duration_ms,
674 })
675 }
676
677 pub fn fail_step(
679 &mut self,
680 step_id: StepId,
681 error: ExecutionError,
682 ) -> Result<(), ReducerError> {
683 self.dispatch(ExecutionAction::StepFailed { step_id, error })
684 }
685
686 pub fn fail_step_with_message(
688 &mut self,
689 step_id: StepId,
690 message: impl Into<String>,
691 ) -> Result<(), ReducerError> {
692 self.fail_step(step_id, ExecutionError::kernel_internal(message))
693 }
694
695 pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), ReducerError> {
697 self.dispatch(ExecutionAction::Pause {
698 reason: reason.into(),
699 })
700 }
701
702 pub fn resume(&mut self) -> Result<(), ReducerError> {
704 self.dispatch(ExecutionAction::Resume)
705 }
706
707 pub fn wait_for(&mut self, reason: WaitReason) -> Result<(), ReducerError> {
709 self.dispatch(ExecutionAction::Wait { reason })
710 }
711
712 pub fn input_received(&mut self) -> Result<(), ReducerError> {
714 self.dispatch(ExecutionAction::InputReceived)
715 }
716
717 pub fn complete(&mut self, output: Option<String>) -> Result<(), ReducerError> {
719 self.dispatch(ExecutionAction::Complete { output })
720 }
721
722 pub fn fail(&mut self, error: ExecutionError) -> Result<(), ReducerError> {
724 self.dispatch(ExecutionAction::Fail { error })
725 }
726
727 pub fn fail_with_message(&mut self, message: impl Into<String>) -> Result<(), ReducerError> {
729 self.fail(ExecutionError::kernel_internal(message))
730 }
731
732 pub fn cancel_execution(&mut self, reason: impl Into<String>) -> Result<(), ReducerError> {
734 self.dispatch(ExecutionAction::Cancel {
735 reason: reason.into(),
736 })
737 }
738
739 pub async fn execute_graph(
750 &mut self,
751 graph: &CompiledGraph,
752 input: &str,
753 ) -> anyhow::Result<NodeState> {
754 self.start()?;
756
757 let mut state = NodeState::from_string(input);
758 let mut current_node = graph.entry_point().to_string();
759
760 let cancel_token = self.cancellation_token.clone();
762
763 loop {
764 if self.is_cancelled() {
766 self.cancel_execution("Cancelled by user")?;
767 anyhow::bail!("Execution cancelled");
768 }
769
770 let node = graph
772 .get_node(¤t_node)
773 .ok_or_else(|| anyhow::anyhow!("Node '{}' not found", current_node))?;
774
775 let step_start = Instant::now();
777 let step_id = self.begin_step(StepType::FunctionNode, current_node.clone(), None)?;
778
779 let node_future = node.execute(state.clone());
782 let result = tokio::select! {
783 biased; _ = cancel_token.cancelled() => {
786 let error = ExecutionError::kernel_internal("Cancelled during step execution")
788 .with_step_id(step_id.clone());
789 self.fail_step(step_id, error)?;
790 self.cancel_execution("Cancelled during step execution")?;
791 anyhow::bail!("Execution cancelled during step");
792 }
793 result = node_future => result,
794 };
795
796 let duration_ms = step_start.elapsed().as_millis() as u64;
797
798 match result {
799 Ok(new_state) => {
800 state = new_state;
801 self.complete_step(
802 step_id,
803 Some(state.as_str().unwrap_or_default().to_string()),
804 duration_ms,
805 )?;
806 }
807 Err(e) => {
808 let error = ExecutionError::kernel_internal(e.to_string())
809 .with_step_id(step_id.clone());
810 self.fail_step(step_id.clone(), error.clone())?;
811 self.fail(error)?;
812 return Err(e);
813 }
814 }
815
816 if let Some(action) = self.check_inbox()? {
818 match action {
819 InboxAction::Pause => {
820 tracing::info!(
824 execution_id = %self.execution.id,
825 "Execution paused via inbox, continuing in paused state"
826 );
827 }
828 InboxAction::Cancel(reason) => {
829 self.cancel_execution(&reason)?;
830 anyhow::bail!("Execution cancelled via inbox: {}", reason);
831 }
832 InboxAction::Continue => {
833 }
835 }
836 }
837
838 let output = state.as_str().unwrap_or_default();
840 let next = graph.get_next(¤t_node, output);
841
842 if next.is_empty() {
843 break;
844 }
845
846 match &next[0] {
847 crate::graph::EdgeTarget::End => break,
848 crate::graph::EdgeTarget::Node(n) => {
849 current_node = n.clone();
850 }
851 }
852 }
853
854 self.complete(Some(state.as_str().unwrap_or_default().to_string()))?;
856
857 Ok(state)
858 }
859
860 fn check_inbox(&mut self) -> Result<Option<InboxAction>, ReducerError> {
874 let inbox = match &self.inbox {
875 Some(inbox) => inbox.clone(),
876 None => return Ok(None),
877 };
878
879 let execution_ids_to_check = self.get_inbox_execution_ids();
881
882 let has_messages = execution_ids_to_check
884 .iter()
885 .any(|id| inbox.has_control_messages(id) || !inbox.is_empty(id));
886
887 if !has_messages {
888 return Ok(None);
889 }
890
891 let messages: Vec<InboxMessage> = execution_ids_to_check
893 .iter()
894 .flat_map(|id| inbox.drain_messages(id))
895 .collect();
896
897 let mut action = InboxAction::Continue;
898
899 for message in messages {
900 self.emit_inbox_event(&message);
902
903 match message {
904 InboxMessage::Control(ctrl) => {
905 match ctrl.action {
906 ControlAction::Pause => {
907 tracing::info!(
908 execution_id = %self.execution.id,
909 actor = %ctrl.actor,
910 reason = ?ctrl.reason,
911 "Inbox: Pause requested"
912 );
913 self.pause(
914 ctrl.reason
915 .unwrap_or_else(|| "Paused via inbox".to_string()),
916 )?;
917 action = InboxAction::Pause;
918 }
919 ControlAction::Resume => {
920 tracing::info!(
921 execution_id = %self.execution.id,
922 actor = %ctrl.actor,
923 "Inbox: Resume requested"
924 );
925 self.resume()?;
926 action = InboxAction::Continue;
927 }
928 ControlAction::Cancel => {
929 let reason = ctrl
930 .reason
931 .unwrap_or_else(|| "Cancelled via inbox".to_string());
932 tracing::info!(
933 execution_id = %self.execution.id,
934 actor = %ctrl.actor,
935 reason = %reason,
936 "Inbox: Cancel requested"
937 );
938 return Ok(Some(InboxAction::Cancel(reason)));
939 }
940 ControlAction::Checkpoint => {
941 tracing::info!(
942 execution_id = %self.execution.id,
943 "Inbox: Checkpoint requested"
944 );
945 }
947 ControlAction::Compact => {
948 tracing::info!(
949 execution_id = %self.execution.id,
950 "Inbox: Compact requested"
951 );
952 }
954 }
955 }
956 InboxMessage::Guidance(guidance) => {
957 tracing::info!(
958 execution_id = %self.execution.id,
959 from = ?guidance.from,
960 priority = ?guidance.priority,
961 content = %guidance.content,
962 "Inbox: Guidance received"
963 );
964 }
967 InboxMessage::Evidence(evidence) => {
968 tracing::info!(
969 execution_id = %self.execution.id,
970 source = ?evidence.source,
971 impact = ?evidence.impact,
972 title = %evidence.title,
973 "Inbox: Evidence received"
974 );
975 }
978 InboxMessage::A2a(a2a) => {
979 tracing::debug!(
980 execution_id = %self.execution.id,
981 from_agent = %a2a.from_agent,
982 message_type = %a2a.message_type,
983 "Inbox: A2A message received"
984 );
985 }
988 }
989 }
990
991 Ok(Some(action))
992 }
993
994 fn emit_inbox_event(&self, message: &InboxMessage) {
996 let event =
997 StreamEvent::inbox_message(&self.execution.id, message.id(), message.message_type());
998 self.emitter.emit(event);
999 }
1000
1001 #[cfg_attr(test, allow(dead_code))]
1012 pub(crate) fn get_inbox_execution_ids(&self) -> Vec<ExecutionId> {
1013 match &self.spawn_mode {
1014 Some(SpawnMode::Inline) => {
1016 vec![self.execution.id.clone()]
1017 }
1018 Some(SpawnMode::Child {
1020 inherit_inbox: true,
1021 ..
1022 }) => {
1023 let mut ids = vec![self.execution.id.clone()];
1024 if let Some(parent_id) = &self.parent_execution_id {
1025 ids.push(parent_id.clone());
1026 tracing::debug!(
1027 execution_id = %self.execution.id,
1028 parent_id = %parent_id,
1029 "Checking both parent and own inbox (inherit_inbox=true)"
1030 );
1031 }
1032 ids
1033 }
1034 Some(SpawnMode::Child {
1036 inherit_inbox: false,
1037 ..
1038 }) => {
1039 tracing::debug!(
1040 execution_id = %self.execution.id,
1041 "Using isolated inbox (inherit_inbox=false)"
1042 );
1043 vec![self.execution.id.clone()]
1044 }
1045 None => {
1047 vec![self.execution.id.clone()]
1048 }
1049 }
1050 }
1051
1052 fn emit_event_for_action(&self, action: &ExecutionAction) {
1054 let event = match action {
1055 ExecutionAction::Start => StreamEvent::execution_start(&self.execution.id),
1056 ExecutionAction::StepStarted {
1057 step_id,
1058 step_type,
1059 name,
1060 ..
1061 } => StreamEvent::step_start(
1062 &self.execution.id,
1063 step_id,
1064 step_type.clone(),
1065 name.clone(),
1066 ),
1067 ExecutionAction::StepCompleted {
1068 step_id,
1069 output,
1070 duration_ms,
1071 } => StreamEvent::step_end(&self.execution.id, step_id, output.clone(), *duration_ms),
1072 ExecutionAction::StepFailed { step_id, error } => {
1073 StreamEvent::step_failed(&self.execution.id, step_id, error.clone())
1074 }
1075 ExecutionAction::Pause { reason } => {
1076 StreamEvent::execution_paused(&self.execution.id, reason.clone())
1077 }
1078 ExecutionAction::Resume => StreamEvent::execution_resumed(&self.execution.id),
1079 ExecutionAction::Complete { output } => {
1080 let duration = self.execution.duration_ms().unwrap_or(0);
1081 StreamEvent::execution_end(&self.execution.id, output.clone(), duration)
1082 }
1083 ExecutionAction::Fail { error } => {
1084 StreamEvent::execution_failed(&self.execution.id, error.clone())
1085 }
1086 ExecutionAction::Cancel { reason } => {
1087 StreamEvent::execution_cancelled(&self.execution.id, reason.clone())
1088 }
1089 ExecutionAction::Wait { .. } | ExecutionAction::InputReceived => {
1090 return;
1092 }
1093 };
1094
1095 self.emitter.emit(event);
1096 }
1097
1098 fn persist_snapshot_best_effort(&self) {
1099 let Some(store) = self.state_store.as_ref() else {
1100 return;
1101 };
1102
1103 let current_step_id = self.execution.step_order.iter().rev().find_map(|id| {
1104 self.execution
1105 .steps
1106 .get(id)
1107 .and_then(|s| (s.state == StepState::Running).then_some(id.clone()))
1108 });
1109
1110 let step_outputs = self
1111 .execution
1112 .steps
1113 .iter()
1114 .filter_map(|(step_id, step)| {
1115 step.output
1116 .as_ref()
1117 .map(|output| (step_id.clone(), serde_json::Value::String(output.clone())))
1118 })
1119 .collect();
1120
1121 let mut snapshot = ExecutionSnapshot::with_user(
1122 self.execution.id.clone(),
1123 self.tenant_context.tenant_id.clone(),
1124 self.tenant_context.user_id.clone(),
1125 self.execution.state,
1126 self.execution.step_order.len() as u64,
1127 );
1128 snapshot.current_step_id = current_step_id;
1129 snapshot.step_outputs = step_outputs;
1130
1131 let store = Arc::clone(store);
1132 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1133 handle.spawn(async move {
1134 if let Err(e) = store.save_snapshot(snapshot).await {
1135 tracing::debug!("State snapshot persistence failed: {}", e);
1136 }
1137 });
1138 }
1139 }
1140
1141 fn emit_signal_best_effort(&self, action: &ExecutionAction) {
1142 let Some(bus) = self.signal_bus.as_ref() else {
1143 return;
1144 };
1145
1146 let action_name = match action {
1147 ExecutionAction::Start => "start",
1148 ExecutionAction::StepStarted { .. } => "step_started",
1149 ExecutionAction::StepCompleted { .. } => "step_completed",
1150 ExecutionAction::StepFailed { .. } => "step_failed",
1151 ExecutionAction::Pause { .. } => "paused",
1152 ExecutionAction::Resume => "resumed",
1153 ExecutionAction::Complete { .. } => "completed",
1154 ExecutionAction::Fail { .. } => "failed",
1155 ExecutionAction::Cancel { .. } => "cancelled",
1156 ExecutionAction::Wait { .. } => "waiting",
1157 ExecutionAction::InputReceived => "input_received",
1158 };
1159
1160 let signal = serde_json::json!({
1161 "execution_id": self.execution.id.to_string(),
1162 "tenant_id": self.tenant_context.tenant_id.to_string(),
1163 "action": action_name,
1164 "state": format!("{:?}", self.execution.state),
1165 });
1166
1167 let signal_bytes = match serde_json::to_vec(&signal) {
1168 Ok(bytes) => bytes,
1169 Err(_) => return,
1170 };
1171
1172 let bus = Arc::clone(bus);
1173 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1174 handle.spawn(async move {
1175 if let Err(e) = bus.emit("execution.lifecycle", &signal_bytes).await {
1176 tracing::debug!("Signal emit failed: {}", e);
1177 }
1178 });
1179 }
1180 }
1181
1182 pub async fn emit_protected(&self, event: StreamEvent) -> anyhow::Result<()> {
1195 if let Some(protected) = &self.protected_emitter {
1196 protected.emit(event).await?;
1197 } else {
1198 self.emitter.emit(event);
1199 }
1200 Ok(())
1201 }
1202
1203 pub fn emit_unprotected(&self, event: StreamEvent) {
1208 if let Some(protected) = &self.protected_emitter {
1209 protected.emit_unprotected(event);
1210 } else {
1211 self.emitter.emit(event);
1212 }
1213 }
1214
1215 pub fn has_protected_emitter(&self) -> bool {
1217 self.protected_emitter.is_some()
1218 }
1219
1220 pub fn protected_emitter(&self) -> Option<&ProtectedEventEmitter> {
1222 self.protected_emitter.as_ref()
1223 }
1224}
1225
1226#[cfg(test)]
1230mod tests {
1231 use super::*;
1232 use crate::context::ResourceLimits;
1233 use crate::TenantId;
1234
1235 #[tokio::test]
1236 async fn emits_warning_event_when_limits_near_threshold() {
1237 let limits = ResourceLimits {
1238 max_steps: 5,
1239 ..Default::default()
1240 };
1241 let tenant = TenantContext::new(TenantId::new()).with_limits(limits);
1242
1243 let mut kernel = ExecutionKernel::new(tenant);
1244 kernel.register_for_enforcement().await;
1245
1246 kernel.record_step_completed().await;
1248 kernel.record_step_completed().await;
1249 kernel.record_step_completed().await;
1250
1251 kernel.check_limits_before_step().await.unwrap();
1252
1253 let events = kernel.emitter.drain();
1254 assert!(
1255 events.iter().any(|e| {
1256 matches!(
1257 e,
1258 StreamEvent::PolicyDecision {
1259 decision,
1260 tool_name,
1261 ..
1262 } if decision == "warn" && tool_name == "enforcement"
1263 )
1264 }),
1265 "expected enforcement warning event"
1266 );
1267 }
1268
1269 #[test]
1274 fn test_kernel_with_spawn_mode_inline() {
1275 let tenant = TenantContext::new(TenantId::new());
1276 let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Inline);
1277
1278 assert!(kernel.spawn_mode().is_some());
1279 assert_eq!(*kernel.spawn_mode().unwrap(), SpawnMode::Inline);
1280 }
1281
1282 #[test]
1283 fn test_kernel_with_spawn_mode_child() {
1284 let tenant = TenantContext::new(TenantId::new());
1285 let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
1286 background: true,
1287 inherit_inbox: true,
1288 policies: None,
1289 });
1290
1291 assert!(kernel.spawn_mode().is_some());
1292 if let Some(SpawnMode::Child {
1293 background,
1294 inherit_inbox,
1295 ..
1296 }) = kernel.spawn_mode()
1297 {
1298 assert!(*background);
1299 assert!(*inherit_inbox);
1300 } else {
1301 panic!("Expected SpawnMode::Child");
1302 }
1303 }
1304
1305 #[test]
1306 fn test_kernel_with_parent_execution_id() {
1307 let tenant = TenantContext::new(TenantId::new());
1308 let parent_id = ExecutionId::from_string("exec_parent_123");
1309 let kernel = ExecutionKernel::new(tenant).with_parent_execution_id(parent_id.clone());
1310
1311 assert!(kernel.parent_execution_id().is_some());
1312 assert_eq!(*kernel.parent_execution_id().unwrap(), parent_id);
1313 }
1314
1315 #[test]
1316 fn test_kernel_default_no_spawn_mode() {
1317 let tenant = TenantContext::new(TenantId::new());
1318 let kernel = ExecutionKernel::new(tenant);
1319
1320 assert!(kernel.spawn_mode().is_none());
1321 assert!(kernel.parent_execution_id().is_none());
1322 }
1323
1324 #[test]
1329 fn test_get_inbox_execution_ids_no_spawn_mode() {
1330 let tenant = TenantContext::new(TenantId::new());
1331 let kernel = ExecutionKernel::new(tenant);
1332
1333 let ids = kernel.get_inbox_execution_ids();
1334 assert_eq!(ids.len(), 1, "Should return only current execution ID");
1335 assert_eq!(ids[0], *kernel.execution_id());
1336 }
1337
1338 #[test]
1339 fn test_get_inbox_execution_ids_inline_mode() {
1340 let tenant = TenantContext::new(TenantId::new());
1341 let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Inline);
1342
1343 let ids = kernel.get_inbox_execution_ids();
1344 assert_eq!(
1345 ids.len(),
1346 1,
1347 "Inline mode should check current execution only"
1348 );
1349 assert_eq!(ids[0], *kernel.execution_id());
1350 }
1351
1352 #[test]
1353 fn test_get_inbox_execution_ids_child_isolated() {
1354 let tenant = TenantContext::new(TenantId::new());
1355 let parent_id = ExecutionId::from_string("exec_parent");
1356 let kernel = ExecutionKernel::new(tenant)
1357 .with_spawn_mode(SpawnMode::Child {
1358 background: false,
1359 inherit_inbox: false,
1360 policies: None,
1361 })
1362 .with_parent_execution_id(parent_id);
1363
1364 let ids = kernel.get_inbox_execution_ids();
1365 assert_eq!(
1366 ids.len(),
1367 1,
1368 "Child with inherit_inbox=false should be isolated"
1369 );
1370 assert_eq!(
1371 ids[0],
1372 *kernel.execution_id(),
1373 "Should only check own inbox"
1374 );
1375 }
1376
1377 #[test]
1378 fn test_get_inbox_execution_ids_child_inherit() {
1379 let tenant = TenantContext::new(TenantId::new());
1380 let parent_id = ExecutionId::from_string("exec_parent_inherit");
1381 let kernel = ExecutionKernel::new(tenant)
1382 .with_spawn_mode(SpawnMode::Child {
1383 background: false,
1384 inherit_inbox: true,
1385 policies: None,
1386 })
1387 .with_parent_execution_id(parent_id.clone());
1388
1389 let ids = kernel.get_inbox_execution_ids();
1390 assert_eq!(
1391 ids.len(),
1392 2,
1393 "Child with inherit_inbox=true should check both inboxes"
1394 );
1395 assert!(
1396 ids.contains(kernel.execution_id()),
1397 "Should include own execution ID"
1398 );
1399 assert!(
1400 ids.contains(&parent_id),
1401 "Should include parent execution ID"
1402 );
1403 }
1404
1405 #[test]
1406 fn test_get_inbox_execution_ids_child_inherit_no_parent_id() {
1407 let tenant = TenantContext::new(TenantId::new());
1408 let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
1410 background: false,
1411 inherit_inbox: true,
1412 policies: None,
1413 });
1414
1415 let ids = kernel.get_inbox_execution_ids();
1416 assert_eq!(
1418 ids.len(),
1419 1,
1420 "Without parent_execution_id, should only return own ID"
1421 );
1422 assert_eq!(ids[0], *kernel.execution_id());
1423 }
1424
1425 #[test]
1426 fn test_get_inbox_execution_ids_child_background_isolated() {
1427 let tenant = TenantContext::new(TenantId::new());
1428 let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
1429 background: true, inherit_inbox: false, policies: None,
1432 });
1433
1434 let ids = kernel.get_inbox_execution_ids();
1435 assert_eq!(ids.len(), 1, "Background child with isolated inbox");
1436 }
1437
1438 #[test]
1439 fn test_get_inbox_execution_ids_child_background_inherit() {
1440 let tenant = TenantContext::new(TenantId::new());
1441 let parent_id = ExecutionId::from_string("exec_background_parent");
1442 let kernel = ExecutionKernel::new(tenant)
1443 .with_spawn_mode(SpawnMode::Child {
1444 background: true, inherit_inbox: true, policies: None,
1447 })
1448 .with_parent_execution_id(parent_id.clone());
1449
1450 let ids = kernel.get_inbox_execution_ids();
1451 assert_eq!(ids.len(), 2, "Background child can still inherit inbox");
1452 assert!(ids.contains(&parent_id));
1453 }
1454}