Skip to main content

enact_core/kernel/
kernel.rs

1//! ExecutionKernel - The core execution engine
2//!
3//! The kernel is the single point of execution. It:
4//! - Owns the Execution state
5//! - Applies actions through the reducer
6//! - Emits events for observers
7//! - Enforces invariants
8//!
9//! All execution MUST go through the kernel.
10//!
11//! ## ⚠️ CODE OWNERSHIP & FORBIDDEN PATTERNS
12//!
13//! **This module is the SINGLE SOURCE OF TRUTH for execution orchestration.**
14//!
15//! ### Code Ownership
16//! - Only `kernel::kernel` (ExecutionKernel) may orchestrate execution
17//! - ExecutionKernel owns the Execution state
18//! - All state transitions MUST go through `kernel::reducer::reduce()`
19//!
20//! ### Explicitly Forbidden Patterns
21//!
22//! These patterns are **forbidden forever**. If any of these happen, Enact loses its "Now" guarantee.
23//!
24//! 1. **Kernel calling providers directly** – Providers are resolved before kernel execution.
25//!    - The kernel receives resolved providers via `ExecutionRequest`, not provider names or registry lookups
26//!    - No global registries or dynamic discovery in kernel
27//!    - Provider resolution happens outside kernel (in runner/control plane)
28//!
29//! 2. **Streaming mutating state** – Streaming only subscribes and delivers events, never mutates execution state.
30//!    - Streaming is a read-only observer
31//!    - EventEmitter must not have access to ExecutionKernel, Reducer, or ExecutionState for mutation
32//!
33//! 3. **Signals driving execution** – Signals are hints only, never drive state transitions.
34//!    - SignalBus implementations must not have access to ExecutionKernel, Reducer, or ExecutionState
35//!    - Signals cannot directly trigger state changes
36//!
37//! 4. **Tools bypassing ToolPolicy** – All tool execution MUST go through ToolExecutor.
38//!    - ToolExecutor enforces ToolPolicy before every invocation
39//!    - No direct tool calls from kernel
40//!
41//! 5. **Context being optional** – TenantContext is REQUIRED for all executions.
42//!    - There is no "system" execution without a tenant
43//!    - All execution methods must require TenantContext
44//!
45//! 6. **IDs being redefined outside kernel** – Kernel is the ONLY source of truth for IDs.
46//!    - No other module may define ExecutionId, StepId, or other execution identifiers
47//!    - All IDs must come from `kernel::ids`
48//!
49//! ### Invariants Enforced
50//!
51//! - **Single source of truth**: Only the kernel may mutate `Execution` or `Step` state via `kernel::reducer`
52//! - **Policy-first enforcement**: All policy checks run before external providers; decisions are recorded as events
53//! - **Execution Service Parity**: Enact must always run as a decoupled execution service (HTTP/gRPC)
54//! - **Observable decision points**: Branching decisions emit `Decision` events with evidence artifacts
55//!
56//! @see docs/TECHNICAL/04-KERNEL_INVARIANTS.md
57//!
58//! ## Error Handling (feat-02)
59//!
60//! All failures use `ExecutionError` which provides:
61//! - Deterministic retry decisions
62//! - Structured error categories
63//! - Backoff hints
64//! - Idempotency tracking
65
66use super::artifact::{ArtifactStore, ArtifactType, PutArtifactRequest};
67use super::enforcement::{
68    EnforcementMiddleware, EnforcementResult, ExecutionUsage, LongRunningExecutionPolicy,
69};
70use super::error::ExecutionError;
71use super::execution_model::Execution;
72use super::execution_state::{ExecutionState, StepState, WaitReason};
73use super::ids::{ArtifactId, ExecutionId, SpawnMode, StepId, StepType};
74use super::persistence::{ExecutionSnapshot, StateStore};
75use super::reducer::{reduce, ExecutionAction, ReducerError};
76use crate::context::TenantContext;
77use crate::graph::{CompiledGraph, NodeState};
78use crate::inbox::{ControlAction, InboxMessage, InboxStore};
79use crate::signal::SignalBus;
80use crate::streaming::{EventEmitter, ProtectedEventEmitter, StreamEvent};
81use std::sync::Arc;
82use std::time::Instant;
83use tokio_util::sync::CancellationToken;
84
85/// Action to take after processing inbox messages
86#[derive(Debug, Clone)]
87enum InboxAction {
88    /// Continue execution normally
89    Continue,
90    /// Pause execution
91    Pause,
92    /// Cancel execution with reason
93    Cancel(String),
94}
95
96/// ExecutionKernel - the core execution engine
97///
98/// This is THE place where execution happens. Runner wires things up,
99/// but kernel does the work.
100///
101/// ## Invariant: TenantContext is REQUIRED
102///
103/// Every execution MUST have a TenantContext. This ensures:
104/// - Multi-tenant isolation
105/// - Resource limit enforcement
106/// - Billing attribution
107/// - Audit compliance
108pub struct ExecutionKernel {
109    /// Current execution state
110    execution: Execution,
111    /// Tenant context (REQUIRED - enforces multi-tenant isolation)
112    tenant_context: TenantContext,
113    /// Event emitter for streaming
114    emitter: EventEmitter,
115    /// Protected event emitter for sensitive content (feat-guardrails)
116    ///
117    /// When configured, events with potentially sensitive content (step outputs,
118    /// tool results) are emitted through this emitter which applies protection
119    /// processors (PII masking, content filtering, etc.) before streaming.
120    protected_emitter: Option<ProtectedEventEmitter>,
121    /// Cancellation token for async cancellation (proper cooperative cancellation)
122    cancellation_token: CancellationToken,
123    /// Inbox store for mid-execution guidance (INV-INBOX-*)
124    inbox: Option<Arc<dyn InboxStore>>,
125    /// Optional mutable snapshot store (best-effort cache, non-authoritative)
126    state_store: Option<Arc<dyn StateStore>>,
127    /// Optional signal bus for non-authoritative lifecycle hints/notifications
128    signal_bus: Option<Arc<dyn SignalBus>>,
129    /// Artifact store for storing execution artifacts (feat-04)
130    artifact_store: Option<Arc<dyn ArtifactStore>>,
131    /// Enforcement middleware for resource limits (feat-03)
132    ///
133    /// Tracks usage (steps, tokens, cost, discovery depth) and enforces limits
134    /// before each step execution. Integrated with long-running execution controls.
135    enforcement: Arc<EnforcementMiddleware>,
136    /// Long-running execution policy (agentic DAG controls)
137    ///
138    /// Controls discovery depth, discovered step limits, cost thresholds,
139    /// and idle timeout for long-running agentic executions.
140    long_running_policy: LongRunningExecutionPolicy,
141    /// Execution usage tracker (registered with enforcement middleware)
142    usage: Option<Arc<ExecutionUsage>>,
143    /// SpawnMode - how this execution was spawned (for inbox routing)
144    ///
145    /// Controls inbox message routing:
146    /// - Inline: shares parent's inbox
147    /// - Child { inherit_inbox: true }: checks both parent and own inbox
148    /// - Child { inherit_inbox: false }: isolated inbox
149    ///
150    /// @see docs/TECHNICAL/32-SPAWN-MODE.md
151    spawn_mode: Option<SpawnMode>,
152    /// Parent execution ID (if spawned as child)
153    ///
154    /// Used for inbox inheritance when spawn_mode is Child with inherit_inbox=true
155    parent_execution_id: Option<ExecutionId>,
156}
157
158impl ExecutionKernel {
159    /// Create a new kernel with a fresh execution
160    ///
161    /// ## Arguments
162    /// * `tenant_context` - REQUIRED tenant context for multi-tenant isolation
163    ///
164    /// ## Invariant
165    /// TenantContext is REQUIRED for all executions. There is no "system" execution
166    /// without a tenant - this is enforced at compile time.
167    pub fn new(tenant_context: TenantContext) -> Self {
168        let mut execution = Execution::new();
169        execution.tenant_id = Some(tenant_context.tenant_id.clone());
170        Self {
171            execution,
172            tenant_context,
173            emitter: EventEmitter::new(),
174            protected_emitter: None,
175            cancellation_token: CancellationToken::new(),
176            inbox: None,
177            state_store: None,
178            signal_bus: None,
179            artifact_store: None,
180            enforcement: Arc::new(EnforcementMiddleware::new()),
181            long_running_policy: LongRunningExecutionPolicy::standard(),
182            usage: None,
183            spawn_mode: None,
184            parent_execution_id: None,
185        }
186    }
187
188    /// Create a kernel with an existing execution (for replay)
189    ///
190    /// ## Arguments
191    /// * `execution` - Existing execution state to resume
192    /// * `tenant_context` - REQUIRED tenant context (must match execution's tenant)
193    pub fn with_execution(execution: Execution, tenant_context: TenantContext) -> Self {
194        // Note: In production, we should verify execution.tenant_id matches tenant_context.tenant_id
195        Self {
196            execution,
197            tenant_context,
198            emitter: EventEmitter::new(),
199            protected_emitter: None,
200            cancellation_token: CancellationToken::new(),
201            inbox: None,
202            state_store: None,
203            signal_bus: None,
204            artifact_store: None,
205            enforcement: Arc::new(EnforcementMiddleware::new()),
206            long_running_policy: LongRunningExecutionPolicy::standard(),
207            usage: None,
208            spawn_mode: None,
209            parent_execution_id: None,
210        }
211    }
212
213    /// Set the protected event emitter for content protection
214    ///
215    /// When set, events with potentially sensitive content (step outputs,
216    /// tool results, etc.) are emitted through this protected emitter which
217    /// applies protection processors before streaming.
218    ///
219    /// ## Usage
220    /// ```ignore
221    /// use enact_core::streaming::{ProtectedEventEmitter, PiiProtectionProcessor};
222    ///
223    /// let protected_emitter = ProtectedEventEmitter::new()
224    ///     .with_processor(Arc::new(PiiProtectionProcessor::new()));
225    ///
226    /// let kernel = ExecutionKernel::new(tenant_context)
227    ///     .with_protected_emitter(protected_emitter);
228    /// ```
229    pub fn with_protected_emitter(mut self, emitter: ProtectedEventEmitter) -> Self {
230        self.protected_emitter = Some(emitter);
231        self
232    }
233
234    /// Set the inbox store for mid-execution guidance
235    ///
236    /// When set, the kernel will check the inbox after every step (INV-INBOX-001)
237    /// and process messages in priority order (INV-INBOX-002).
238    pub fn with_inbox(mut self, inbox: Arc<dyn InboxStore>) -> Self {
239        self.inbox = Some(inbox);
240        self
241    }
242
243    /// Get the inbox store (if set)
244    pub fn inbox(&self) -> Option<&Arc<dyn InboxStore>> {
245        self.inbox.as_ref()
246    }
247
248    /// Set the state store for snapshot persistence (best-effort cache)
249    pub fn with_state_store(mut self, state_store: Arc<dyn StateStore>) -> Self {
250        self.state_store = Some(state_store);
251        self
252    }
253
254    /// Get state store (if configured)
255    pub fn state_store(&self) -> Option<&Arc<dyn StateStore>> {
256        self.state_store.as_ref()
257    }
258
259    /// Set the signal bus for optional execution lifecycle signaling
260    pub fn with_signal_bus(mut self, signal_bus: Arc<dyn SignalBus>) -> Self {
261        self.signal_bus = Some(signal_bus);
262        self
263    }
264
265    /// Get signal bus (if configured)
266    pub fn signal_bus(&self) -> Option<&Arc<dyn SignalBus>> {
267        self.signal_bus.as_ref()
268    }
269
270    /// Set the artifact store for storing execution artifacts
271    ///
272    /// When set, the kernel can store artifacts produced by steps.
273    /// Artifacts are emitted as events for audit trail.
274    pub fn with_artifact_store(mut self, store: Arc<dyn ArtifactStore>) -> Self {
275        self.artifact_store = Some(store);
276        self
277    }
278
279    /// Get the artifact store (if set)
280    pub fn artifact_store(&self) -> Option<&Arc<dyn ArtifactStore>> {
281        self.artifact_store.as_ref()
282    }
283
284    /// Set the enforcement middleware for resource limits
285    ///
286    /// When set, the kernel uses this enforcement middleware to track usage
287    /// and check limits before each step execution.
288    pub fn with_enforcement(mut self, enforcement: Arc<EnforcementMiddleware>) -> Self {
289        self.enforcement = enforcement;
290        self
291    }
292
293    /// Get the enforcement middleware
294    pub fn enforcement(&self) -> &Arc<EnforcementMiddleware> {
295        &self.enforcement
296    }
297
298    /// Set the long-running execution policy
299    ///
300    /// Controls discovery depth, discovered step limits, cost thresholds,
301    /// and idle timeout for long-running agentic executions.
302    pub fn with_long_running_policy(mut self, policy: LongRunningExecutionPolicy) -> Self {
303        self.long_running_policy = policy;
304        self
305    }
306
307    /// Get the long-running execution policy
308    pub fn long_running_policy(&self) -> &LongRunningExecutionPolicy {
309        &self.long_running_policy
310    }
311
312    /// Set the spawn mode for this execution
313    ///
314    /// Controls inbox message routing:
315    /// - Inline: shares parent's inbox (same ExecutionId)
316    /// - Child { inherit_inbox: true }: checks both parent and own inbox
317    /// - Child { inherit_inbox: false }: isolated inbox
318    ///
319    /// @see docs/TECHNICAL/32-SPAWN-MODE.md
320    pub fn with_spawn_mode(mut self, spawn_mode: SpawnMode) -> Self {
321        self.spawn_mode = Some(spawn_mode);
322        self
323    }
324
325    /// Get the spawn mode (if set)
326    pub fn spawn_mode(&self) -> Option<&SpawnMode> {
327        self.spawn_mode.as_ref()
328    }
329
330    /// Set the parent execution ID for child executions
331    ///
332    /// Used for inbox inheritance when spawn_mode is Child with inherit_inbox=true.
333    /// Must be set when spawning child executions that need to inherit parent's inbox.
334    pub fn with_parent_execution_id(mut self, parent_id: ExecutionId) -> Self {
335        self.parent_execution_id = Some(parent_id);
336        self
337    }
338
339    /// Get the parent execution ID (if set)
340    pub fn parent_execution_id(&self) -> Option<&ExecutionId> {
341        self.parent_execution_id.as_ref()
342    }
343
344    /// Get current usage snapshot for this execution
345    pub fn usage_snapshot(&self) -> Option<super::enforcement::UsageSnapshot> {
346        self.usage
347            .as_ref()
348            .map(|u| super::enforcement::UsageSnapshot::from(u.as_ref()))
349    }
350
351    /// Register this execution with the enforcement middleware
352    ///
353    /// Call this at the start of execution to begin tracking usage.
354    /// Returns the usage tracker for this execution.
355    pub async fn register_for_enforcement(&mut self) -> Arc<ExecutionUsage> {
356        let usage = self
357            .enforcement
358            .register_execution(
359                self.execution.id.clone(),
360                self.tenant_context.tenant_id.clone(),
361            )
362            .await;
363        self.usage = Some(Arc::clone(&usage));
364        usage
365    }
366
367    /// Unregister this execution from the enforcement middleware
368    ///
369    /// Call this at the end of execution to stop tracking usage.
370    pub async fn unregister_from_enforcement(&self) {
371        self.enforcement
372            .unregister_execution(&self.execution.id)
373            .await;
374    }
375
376    /// Check all resource limits before executing a step
377    ///
378    /// This checks:
379    /// - Basic limits (steps, tokens, wall time)
380    /// - Long-running limits (discovery depth, discovered steps, cost, idle)
381    ///
382    /// Returns an error if any limit is exceeded.
383    pub async fn check_limits_before_step(&self) -> Result<(), ExecutionError> {
384        // Check basic resource limits
385        let basic_result = self
386            .enforcement
387            .check_all_limits(&self.execution.id, &self.tenant_context.limits)
388            .await;
389
390        if let EnforcementResult::Blocked(violation) = basic_result {
391            return Err(violation.to_error());
392        }
393
394        // Check long-running execution limits
395        let long_running_result = self
396            .enforcement
397            .check_long_running_limits(&self.execution.id, &self.long_running_policy)
398            .await;
399
400        if let EnforcementResult::Blocked(violation) = long_running_result {
401            return Err(violation.to_error());
402        }
403
404        // Emit warning events if approaching limits
405        if self.enforcement.emit_warning_events_enabled() {
406            if let Some(warning) = match (&basic_result, &long_running_result) {
407                (EnforcementResult::Warning(w), _) => Some(w),
408                (_, EnforcementResult::Warning(w)) => Some(w),
409                _ => None,
410            } {
411                tracing::warn!(
412                    execution_id = %self.execution.id,
413                    warning_type = ?warning.warning_type,
414                    usage_percent = warning.usage_percent,
415                    message = %warning.message,
416                    "Enforcement warning"
417                );
418                self.emitter.emit(StreamEvent::policy_decision_warn(
419                    &self.execution.id,
420                    None,
421                    "enforcement",
422                    warning.message.clone(),
423                ));
424            }
425        }
426        Ok(())
427    }
428
429    /// Record step completion with the enforcement middleware
430    pub async fn record_step_completed(&self) {
431        self.enforcement.record_step(&self.execution.id).await;
432    }
433
434    /// Record token usage with the enforcement middleware
435    pub async fn record_token_usage(&self, input_tokens: u32, output_tokens: u32) {
436        self.enforcement
437            .record_tokens(&self.execution.id, input_tokens, output_tokens)
438            .await;
439    }
440
441    /// Record cost with the enforcement middleware
442    pub async fn record_cost(&self, cost_usd: f64) {
443        self.enforcement
444            .record_cost(&self.execution.id, cost_usd)
445            .await;
446    }
447
448    /// Record a discovered step with the enforcement middleware
449    pub async fn record_discovered_step(&self) {
450        self.enforcement
451            .record_discovered_step(&self.execution.id)
452            .await;
453    }
454
455    /// Push discovery depth (entering a sub-agent execution)
456    pub async fn push_discovery_depth(&self) {
457        self.enforcement
458            .push_discovery_depth(&self.execution.id)
459            .await;
460    }
461
462    /// Pop discovery depth (exiting a sub-agent execution)
463    pub async fn pop_discovery_depth(&self) {
464        self.enforcement
465            .pop_discovery_depth(&self.execution.id)
466            .await;
467    }
468
469    /// Store an artifact produced by a step
470    ///
471    /// This method:
472    /// 1. Stores the artifact in the artifact store
473    /// 2. Emits an ArtifactCreated event for audit trail
474    /// 3. Returns the artifact ID
475    ///
476    /// ## Arguments
477    /// * `step_id` - The step that produced this artifact
478    /// * `name` - Name of the artifact
479    /// * `artifact_type` - Type of artifact
480    /// * `content` - Raw content bytes
481    ///
482    /// ## Returns
483    /// The generated ArtifactId, or None if no artifact store is configured
484    pub async fn store_artifact(
485        &self,
486        step_id: &StepId,
487        name: impl Into<String>,
488        artifact_type: ArtifactType,
489        content: Vec<u8>,
490    ) -> Option<ArtifactId> {
491        let store = self.artifact_store.as_ref()?;
492
493        let request = PutArtifactRequest::new(
494            self.execution.id.clone(),
495            step_id.clone(),
496            name,
497            artifact_type,
498            content,
499        );
500
501        match store.put(request).await {
502            Ok(response) => {
503                // Emit ArtifactCreated event for audit trail
504                let artifact_type_str = format!("{:?}", artifact_type);
505                self.emitter.emit(StreamEvent::artifact_created(
506                    &self.execution.id,
507                    step_id,
508                    &response.artifact_id,
509                    artifact_type_str,
510                ));
511
512                tracing::debug!(
513                    execution_id = %self.execution.id,
514                    step_id = %step_id,
515                    artifact_id = %response.artifact_id,
516                    "Artifact stored"
517                );
518
519                Some(response.artifact_id)
520            }
521            Err(e) => {
522                tracing::warn!(
523                    execution_id = %self.execution.id,
524                    step_id = %step_id,
525                    error = %e,
526                    "Failed to store artifact"
527                );
528                None
529            }
530        }
531    }
532
533    /// Store a text artifact (convenience method)
534    pub async fn store_text_artifact(
535        &self,
536        step_id: &StepId,
537        name: impl Into<String>,
538        content: impl Into<String>,
539    ) -> Option<ArtifactId> {
540        self.store_artifact(
541            step_id,
542            name,
543            ArtifactType::Text,
544            content.into().into_bytes(),
545        )
546        .await
547    }
548
549    /// Store a JSON artifact (convenience method)
550    pub async fn store_json_artifact(
551        &self,
552        step_id: &StepId,
553        name: impl Into<String>,
554        value: &serde_json::Value,
555    ) -> Option<ArtifactId> {
556        let content = serde_json::to_vec_pretty(value).ok()?;
557        self.store_artifact(step_id, name, ArtifactType::Json, content)
558            .await
559    }
560
561    /// Get the tenant context
562    pub fn tenant_context(&self) -> &TenantContext {
563        &self.tenant_context
564    }
565
566    /// Get the execution ID
567    pub fn execution_id(&self) -> &ExecutionId {
568        &self.execution.id
569    }
570
571    /// Get the current execution state
572    pub fn state(&self) -> ExecutionState {
573        self.execution.state
574    }
575
576    /// Get the event emitter
577    pub fn emitter(&self) -> &EventEmitter {
578        &self.emitter
579    }
580
581    /// Get the execution reference
582    pub fn execution(&self) -> &Execution {
583        &self.execution
584    }
585
586    /// Check if cancelled
587    ///
588    /// Uses CancellationToken for proper async cancellation support.
589    pub fn is_cancelled(&self) -> bool {
590        self.cancellation_token.is_cancelled()
591    }
592
593    /// Cancel the execution
594    ///
595    /// This triggers cooperative cancellation of all async operations.
596    /// The actual state transition happens through dispatch.
597    pub fn cancel(&self, _reason: impl Into<String>) {
598        self.cancellation_token.cancel();
599        // Note: actual state transition happens through dispatch
600    }
601
602    /// Get a child cancellation token
603    ///
604    /// Child tokens are cancelled when the parent is cancelled,
605    /// but cancelling a child doesn't affect the parent.
606    pub fn child_cancellation_token(&self) -> CancellationToken {
607        self.cancellation_token.child_token()
608    }
609
610    /// Get the cancellation token for use in async operations
611    ///
612    /// Use this with `tokio::select!` to make async operations cancellable:
613    /// ```ignore
614    /// tokio::select! {
615    ///     _ = token.cancelled() => { /* handle cancellation */ }
616    ///     result = some_async_operation() => { /* handle result */ }
617    /// }
618    /// ```
619    pub fn cancellation_token(&self) -> &CancellationToken {
620        &self.cancellation_token
621    }
622
623    /// Dispatch an action to the reducer
624    ///
625    /// This is the ONLY way to change execution state.
626    pub fn dispatch(&mut self, action: ExecutionAction) -> Result<(), ReducerError> {
627        // Apply through reducer
628        reduce(&mut self.execution, action.clone())?;
629
630        // Emit corresponding event
631        self.emit_event_for_action(&action);
632        // Persist mutable snapshot cache (best-effort)
633        self.persist_snapshot_best_effort();
634        // Emit non-authoritative lifecycle signal (best-effort)
635        self.emit_signal_best_effort(&action);
636
637        Ok(())
638    }
639
640    /// Start execution
641    pub fn start(&mut self) -> Result<(), ReducerError> {
642        self.dispatch(ExecutionAction::Start)
643    }
644
645    /// Begin a step
646    pub fn begin_step(
647        &mut self,
648        step_type: StepType,
649        name: impl Into<String>,
650        parent_step_id: Option<StepId>,
651    ) -> Result<StepId, ReducerError> {
652        let step_id = StepId::new();
653        self.dispatch(ExecutionAction::StepStarted {
654            step_id: step_id.clone(),
655            parent_step_id,
656            step_type,
657            name: name.into(),
658            source: None,
659        })?;
660        Ok(step_id)
661    }
662
663    /// Complete a step
664    pub fn complete_step(
665        &mut self,
666        step_id: StepId,
667        output: Option<String>,
668        duration_ms: u64,
669    ) -> Result<(), ReducerError> {
670        self.dispatch(ExecutionAction::StepCompleted {
671            step_id,
672            output,
673            duration_ms,
674        })
675    }
676
677    /// Fail a step with a structured error (feat-02)
678    pub fn fail_step(
679        &mut self,
680        step_id: StepId,
681        error: ExecutionError,
682    ) -> Result<(), ReducerError> {
683        self.dispatch(ExecutionAction::StepFailed { step_id, error })
684    }
685
686    /// Fail a step with a simple message (creates a KernelInternal error)
687    pub fn fail_step_with_message(
688        &mut self,
689        step_id: StepId,
690        message: impl Into<String>,
691    ) -> Result<(), ReducerError> {
692        self.fail_step(step_id, ExecutionError::kernel_internal(message))
693    }
694
695    /// Pause execution
696    pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), ReducerError> {
697        self.dispatch(ExecutionAction::Pause {
698            reason: reason.into(),
699        })
700    }
701
702    /// Resume execution
703    pub fn resume(&mut self) -> Result<(), ReducerError> {
704        self.dispatch(ExecutionAction::Resume)
705    }
706
707    /// Enter waiting state
708    pub fn wait_for(&mut self, reason: WaitReason) -> Result<(), ReducerError> {
709        self.dispatch(ExecutionAction::Wait { reason })
710    }
711
712    /// Signal that external input was received
713    pub fn input_received(&mut self) -> Result<(), ReducerError> {
714        self.dispatch(ExecutionAction::InputReceived)
715    }
716
717    /// Complete execution
718    pub fn complete(&mut self, output: Option<String>) -> Result<(), ReducerError> {
719        self.dispatch(ExecutionAction::Complete { output })
720    }
721
722    /// Fail execution with a structured error (feat-02)
723    pub fn fail(&mut self, error: ExecutionError) -> Result<(), ReducerError> {
724        self.dispatch(ExecutionAction::Fail { error })
725    }
726
727    /// Fail execution with a simple message (creates a KernelInternal error)
728    pub fn fail_with_message(&mut self, message: impl Into<String>) -> Result<(), ReducerError> {
729        self.fail(ExecutionError::kernel_internal(message))
730    }
731
732    /// Cancel execution
733    pub fn cancel_execution(&mut self, reason: impl Into<String>) -> Result<(), ReducerError> {
734        self.dispatch(ExecutionAction::Cancel {
735            reason: reason.into(),
736        })
737    }
738
739    /// Execute a compiled graph
740    ///
741    /// ## Invariants
742    /// - INV-INBOX-001: Inbox is checked after every step
743    /// - INV-INBOX-002: Control messages are processed first (via priority_order)
744    /// - INV-INBOX-003: Inbox events are emitted for audit trail
745    ///
746    /// ## Async Cancellation
747    /// Uses tokio::select! with CancellationToken for cooperative cancellation.
748    /// Node execution can be interrupted cleanly if cancellation is requested.
749    pub async fn execute_graph(
750        &mut self,
751        graph: &CompiledGraph,
752        input: &str,
753    ) -> anyhow::Result<NodeState> {
754        // Start execution
755        self.start()?;
756
757        let mut state = NodeState::from_string(input);
758        let mut current_node = graph.entry_point().to_string();
759
760        // Get cancellation token for use in select!
761        let cancel_token = self.cancellation_token.clone();
762
763        loop {
764            // Check for cancellation before each step
765            if self.is_cancelled() {
766                self.cancel_execution("Cancelled by user")?;
767                anyhow::bail!("Execution cancelled");
768            }
769
770            // Get the node
771            let node = graph
772                .get_node(&current_node)
773                .ok_or_else(|| anyhow::anyhow!("Node '{}' not found", current_node))?;
774
775            // Begin step
776            let step_start = Instant::now();
777            let step_id = self.begin_step(StepType::FunctionNode, current_node.clone(), None)?;
778
779            // Execute node with cancellation support using tokio::select!
780            // This allows long-running operations to be interrupted
781            let node_future = node.execute(state.clone());
782            let result = tokio::select! {
783                biased;  // Check cancellation first
784
785                _ = cancel_token.cancelled() => {
786                    // Cancellation requested during node execution
787                    let error = ExecutionError::kernel_internal("Cancelled during step execution")
788                        .with_step_id(step_id.clone());
789                    self.fail_step(step_id, error)?;
790                    self.cancel_execution("Cancelled during step execution")?;
791                    anyhow::bail!("Execution cancelled during step");
792                }
793                result = node_future => result,
794            };
795
796            let duration_ms = step_start.elapsed().as_millis() as u64;
797
798            match result {
799                Ok(new_state) => {
800                    state = new_state;
801                    self.complete_step(
802                        step_id,
803                        Some(state.as_str().unwrap_or_default().to_string()),
804                        duration_ms,
805                    )?;
806                }
807                Err(e) => {
808                    let error = ExecutionError::kernel_internal(e.to_string())
809                        .with_step_id(step_id.clone());
810                    self.fail_step(step_id.clone(), error.clone())?;
811                    self.fail(error)?;
812                    return Err(e);
813                }
814            }
815
816            // INV-INBOX-001: Check inbox after every step
817            if let Some(action) = self.check_inbox()? {
818                match action {
819                    InboxAction::Pause => {
820                        // Pause has already been applied via dispatch in check_inbox()
821                        // In a full implementation, we would suspend here and wait for resume
822                        // For now, we log and continue - the state machine is already in Paused
823                        tracing::info!(
824                            execution_id = %self.execution.id,
825                            "Execution paused via inbox, continuing in paused state"
826                        );
827                    }
828                    InboxAction::Cancel(reason) => {
829                        self.cancel_execution(&reason)?;
830                        anyhow::bail!("Execution cancelled via inbox: {}", reason);
831                    }
832                    InboxAction::Continue => {
833                        // Continue execution normally
834                    }
835                }
836            }
837
838            // Get next nodes
839            let output = state.as_str().unwrap_or_default();
840            let next = graph.get_next(&current_node, output);
841
842            if next.is_empty() {
843                break;
844            }
845
846            match &next[0] {
847                crate::graph::EdgeTarget::End => break,
848                crate::graph::EdgeTarget::Node(n) => {
849                    current_node = n.clone();
850                }
851            }
852        }
853
854        // Complete execution
855        self.complete(Some(state.as_str().unwrap_or_default().to_string()))?;
856
857        Ok(state)
858    }
859
860    /// Check inbox for messages and process them
861    ///
862    /// ## Invariants
863    /// - INV-INBOX-001: Called after every step
864    /// - INV-INBOX-002: Control messages processed first (via drain_messages sorting)
865    /// - INV-INBOX-003: All messages emit events for audit trail
866    /// - INV-SPAWN-002: Inbox inheritance based on SpawnMode
867    ///
868    /// ## SpawnMode Routing (@see docs/TECHNICAL/32-SPAWN-MODE.md)
869    ///
870    /// - Inline mode (default): Check current execution's inbox
871    /// - Child { inherit_inbox: true }: Check both parent and own inbox
872    /// - Child { inherit_inbox: false }: Check only own inbox (isolated)
873    fn check_inbox(&mut self) -> Result<Option<InboxAction>, ReducerError> {
874        let inbox = match &self.inbox {
875            Some(inbox) => inbox.clone(),
876            None => return Ok(None),
877        };
878
879        // Determine which execution IDs to check based on SpawnMode
880        let execution_ids_to_check = self.get_inbox_execution_ids();
881
882        // Fast path: check if any inbox has messages
883        let has_messages = execution_ids_to_check
884            .iter()
885            .any(|id| inbox.has_control_messages(id) || !inbox.is_empty(id));
886
887        if !has_messages {
888            return Ok(None);
889        }
890
891        // Drain messages from all applicable inboxes, sorted by priority (INV-INBOX-002)
892        let messages: Vec<InboxMessage> = execution_ids_to_check
893            .iter()
894            .flat_map(|id| inbox.drain_messages(id))
895            .collect();
896
897        let mut action = InboxAction::Continue;
898
899        for message in messages {
900            // INV-INBOX-003: Emit event for audit trail
901            self.emit_inbox_event(&message);
902
903            match message {
904                InboxMessage::Control(ctrl) => {
905                    match ctrl.action {
906                        ControlAction::Pause => {
907                            tracing::info!(
908                                execution_id = %self.execution.id,
909                                actor = %ctrl.actor,
910                                reason = ?ctrl.reason,
911                                "Inbox: Pause requested"
912                            );
913                            self.pause(
914                                ctrl.reason
915                                    .unwrap_or_else(|| "Paused via inbox".to_string()),
916                            )?;
917                            action = InboxAction::Pause;
918                        }
919                        ControlAction::Resume => {
920                            tracing::info!(
921                                execution_id = %self.execution.id,
922                                actor = %ctrl.actor,
923                                "Inbox: Resume requested"
924                            );
925                            self.resume()?;
926                            action = InboxAction::Continue;
927                        }
928                        ControlAction::Cancel => {
929                            let reason = ctrl
930                                .reason
931                                .unwrap_or_else(|| "Cancelled via inbox".to_string());
932                            tracing::info!(
933                                execution_id = %self.execution.id,
934                                actor = %ctrl.actor,
935                                reason = %reason,
936                                "Inbox: Cancel requested"
937                            );
938                            return Ok(Some(InboxAction::Cancel(reason)));
939                        }
940                        ControlAction::Checkpoint => {
941                            tracing::info!(
942                                execution_id = %self.execution.id,
943                                "Inbox: Checkpoint requested"
944                            );
945                            // Checkpoint is handled by the runner, not the kernel
946                        }
947                        ControlAction::Compact => {
948                            tracing::info!(
949                                execution_id = %self.execution.id,
950                                "Inbox: Compact requested"
951                            );
952                            // Compact is handled by the runner, not the kernel
953                        }
954                    }
955                }
956                InboxMessage::Guidance(guidance) => {
957                    tracing::info!(
958                        execution_id = %self.execution.id,
959                        from = ?guidance.from,
960                        priority = ?guidance.priority,
961                        content = %guidance.content,
962                        "Inbox: Guidance received"
963                    );
964                    // Guidance is logged but not acted upon in the kernel
965                    // The agent/LLM layer processes guidance
966                }
967                InboxMessage::Evidence(evidence) => {
968                    tracing::info!(
969                        execution_id = %self.execution.id,
970                        source = ?evidence.source,
971                        impact = ?evidence.impact,
972                        title = %evidence.title,
973                        "Inbox: Evidence received"
974                    );
975                    // Evidence is logged but not acted upon in the kernel
976                    // The agent/LLM layer processes evidence
977                }
978                InboxMessage::A2a(a2a) => {
979                    tracing::debug!(
980                        execution_id = %self.execution.id,
981                        from_agent = %a2a.from_agent,
982                        message_type = %a2a.message_type,
983                        "Inbox: A2A message received"
984                    );
985                    // A2A messages are logged but not acted upon in the kernel
986                    // The agent/LLM layer processes A2A messages
987                }
988            }
989        }
990
991        Ok(Some(action))
992    }
993
994    /// Emit an event for an inbox message (INV-INBOX-003)
995    fn emit_inbox_event(&self, message: &InboxMessage) {
996        let event =
997            StreamEvent::inbox_message(&self.execution.id, message.id(), message.message_type());
998        self.emitter.emit(event);
999    }
1000
1001    /// Determine which execution IDs to check for inbox messages based on SpawnMode
1002    ///
1003    /// ## SpawnMode Routing Rules (INV-SPAWN-002)
1004    ///
1005    /// - Inline mode: Check current execution's inbox (same as default)
1006    /// - Child { inherit_inbox: true }: Check both parent and own inbox
1007    /// - Child { inherit_inbox: false }: Check only own inbox (isolated)
1008    /// - No spawn_mode: Default to current execution only
1009    ///
1010    /// @see docs/TECHNICAL/32-SPAWN-MODE.md
1011    #[cfg_attr(test, allow(dead_code))]
1012    pub(crate) fn get_inbox_execution_ids(&self) -> Vec<ExecutionId> {
1013        match &self.spawn_mode {
1014            // Inline mode: use current execution only (inline shares parent's ExecutionId anyway)
1015            Some(SpawnMode::Inline) => {
1016                vec![self.execution.id.clone()]
1017            }
1018            // Child mode with inherit_inbox: check both parent and own inbox
1019            Some(SpawnMode::Child {
1020                inherit_inbox: true,
1021                ..
1022            }) => {
1023                let mut ids = vec![self.execution.id.clone()];
1024                if let Some(parent_id) = &self.parent_execution_id {
1025                    ids.push(parent_id.clone());
1026                    tracing::debug!(
1027                        execution_id = %self.execution.id,
1028                        parent_id = %parent_id,
1029                        "Checking both parent and own inbox (inherit_inbox=true)"
1030                    );
1031                }
1032                ids
1033            }
1034            // Child mode without inherit_inbox: isolated inbox
1035            Some(SpawnMode::Child {
1036                inherit_inbox: false,
1037                ..
1038            }) => {
1039                tracing::debug!(
1040                    execution_id = %self.execution.id,
1041                    "Using isolated inbox (inherit_inbox=false)"
1042                );
1043                vec![self.execution.id.clone()]
1044            }
1045            // No spawn mode set: default to current execution
1046            None => {
1047                vec![self.execution.id.clone()]
1048            }
1049        }
1050    }
1051
1052    /// Emit a stream event corresponding to an action
1053    fn emit_event_for_action(&self, action: &ExecutionAction) {
1054        let event = match action {
1055            ExecutionAction::Start => StreamEvent::execution_start(&self.execution.id),
1056            ExecutionAction::StepStarted {
1057                step_id,
1058                step_type,
1059                name,
1060                ..
1061            } => StreamEvent::step_start(
1062                &self.execution.id,
1063                step_id,
1064                step_type.clone(),
1065                name.clone(),
1066            ),
1067            ExecutionAction::StepCompleted {
1068                step_id,
1069                output,
1070                duration_ms,
1071            } => StreamEvent::step_end(&self.execution.id, step_id, output.clone(), *duration_ms),
1072            ExecutionAction::StepFailed { step_id, error } => {
1073                StreamEvent::step_failed(&self.execution.id, step_id, error.clone())
1074            }
1075            ExecutionAction::Pause { reason } => {
1076                StreamEvent::execution_paused(&self.execution.id, reason.clone())
1077            }
1078            ExecutionAction::Resume => StreamEvent::execution_resumed(&self.execution.id),
1079            ExecutionAction::Complete { output } => {
1080                let duration = self.execution.duration_ms().unwrap_or(0);
1081                StreamEvent::execution_end(&self.execution.id, output.clone(), duration)
1082            }
1083            ExecutionAction::Fail { error } => {
1084                StreamEvent::execution_failed(&self.execution.id, error.clone())
1085            }
1086            ExecutionAction::Cancel { reason } => {
1087                StreamEvent::execution_cancelled(&self.execution.id, reason.clone())
1088            }
1089            ExecutionAction::Wait { .. } | ExecutionAction::InputReceived => {
1090                // No specific stream events for these yet
1091                return;
1092            }
1093        };
1094
1095        self.emitter.emit(event);
1096    }
1097
1098    fn persist_snapshot_best_effort(&self) {
1099        let Some(store) = self.state_store.as_ref() else {
1100            return;
1101        };
1102
1103        let current_step_id = self.execution.step_order.iter().rev().find_map(|id| {
1104            self.execution
1105                .steps
1106                .get(id)
1107                .and_then(|s| (s.state == StepState::Running).then_some(id.clone()))
1108        });
1109
1110        let step_outputs = self
1111            .execution
1112            .steps
1113            .iter()
1114            .filter_map(|(step_id, step)| {
1115                step.output
1116                    .as_ref()
1117                    .map(|output| (step_id.clone(), serde_json::Value::String(output.clone())))
1118            })
1119            .collect();
1120
1121        let mut snapshot = ExecutionSnapshot::with_user(
1122            self.execution.id.clone(),
1123            self.tenant_context.tenant_id.clone(),
1124            self.tenant_context.user_id.clone(),
1125            self.execution.state,
1126            self.execution.step_order.len() as u64,
1127        );
1128        snapshot.current_step_id = current_step_id;
1129        snapshot.step_outputs = step_outputs;
1130
1131        let store = Arc::clone(store);
1132        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1133            handle.spawn(async move {
1134                if let Err(e) = store.save_snapshot(snapshot).await {
1135                    tracing::debug!("State snapshot persistence failed: {}", e);
1136                }
1137            });
1138        }
1139    }
1140
1141    fn emit_signal_best_effort(&self, action: &ExecutionAction) {
1142        let Some(bus) = self.signal_bus.as_ref() else {
1143            return;
1144        };
1145
1146        let action_name = match action {
1147            ExecutionAction::Start => "start",
1148            ExecutionAction::StepStarted { .. } => "step_started",
1149            ExecutionAction::StepCompleted { .. } => "step_completed",
1150            ExecutionAction::StepFailed { .. } => "step_failed",
1151            ExecutionAction::Pause { .. } => "paused",
1152            ExecutionAction::Resume => "resumed",
1153            ExecutionAction::Complete { .. } => "completed",
1154            ExecutionAction::Fail { .. } => "failed",
1155            ExecutionAction::Cancel { .. } => "cancelled",
1156            ExecutionAction::Wait { .. } => "waiting",
1157            ExecutionAction::InputReceived => "input_received",
1158        };
1159
1160        let signal = serde_json::json!({
1161            "execution_id": self.execution.id.to_string(),
1162            "tenant_id": self.tenant_context.tenant_id.to_string(),
1163            "action": action_name,
1164            "state": format!("{:?}", self.execution.state),
1165        });
1166
1167        let signal_bytes = match serde_json::to_vec(&signal) {
1168            Ok(bytes) => bytes,
1169            Err(_) => return,
1170        };
1171
1172        let bus = Arc::clone(bus);
1173        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1174            handle.spawn(async move {
1175                if let Err(e) = bus.emit("execution.lifecycle", &signal_bytes).await {
1176                    tracing::debug!("Signal emit failed: {}", e);
1177                }
1178            });
1179        }
1180    }
1181
1182    // =========================================================================
1183    // Protected Event Emission (P2 #2: ProtectedEventEmitter Integration)
1184    // =========================================================================
1185
1186    /// Emit an event with protection processing
1187    ///
1188    /// If a protected emitter is configured, the event passes through the
1189    /// protection pipeline before being emitted. Otherwise, falls back to
1190    /// the regular emitter.
1191    ///
1192    /// Use this for events that may contain sensitive content (step outputs,
1193    /// tool results, etc.).
1194    pub async fn emit_protected(&self, event: StreamEvent) -> anyhow::Result<()> {
1195        if let Some(protected) = &self.protected_emitter {
1196            protected.emit(event).await?;
1197        } else {
1198            self.emitter.emit(event);
1199        }
1200        Ok(())
1201    }
1202
1203    /// Emit an event without protection (control events, etc.)
1204    ///
1205    /// Use for events that are guaranteed safe (control signals, execution
1206    /// lifecycle events without content).
1207    pub fn emit_unprotected(&self, event: StreamEvent) {
1208        if let Some(protected) = &self.protected_emitter {
1209            protected.emit_unprotected(event);
1210        } else {
1211            self.emitter.emit(event);
1212        }
1213    }
1214
1215    /// Check if protected emitter is configured
1216    pub fn has_protected_emitter(&self) -> bool {
1217        self.protected_emitter.is_some()
1218    }
1219
1220    /// Get the protected emitter (if configured)
1221    pub fn protected_emitter(&self) -> Option<&ProtectedEventEmitter> {
1222        self.protected_emitter.as_ref()
1223    }
1224}
1225
1226// Note: ExecutionKernel does NOT implement Default because TenantContext is REQUIRED.
1227// This is intentional - every execution must have a tenant for multi-tenant isolation.
1228
1229#[cfg(test)]
1230mod tests {
1231    use super::*;
1232    use crate::context::ResourceLimits;
1233    use crate::TenantId;
1234
1235    #[tokio::test]
1236    async fn emits_warning_event_when_limits_near_threshold() {
1237        let limits = ResourceLimits {
1238            max_steps: 5,
1239            ..Default::default()
1240        };
1241        let tenant = TenantContext::new(TenantId::new()).with_limits(limits);
1242
1243        let mut kernel = ExecutionKernel::new(tenant);
1244        kernel.register_for_enforcement().await;
1245
1246        // Record progress to reach warning threshold (80% at next step)
1247        kernel.record_step_completed().await;
1248        kernel.record_step_completed().await;
1249        kernel.record_step_completed().await;
1250
1251        kernel.check_limits_before_step().await.unwrap();
1252
1253        let events = kernel.emitter.drain();
1254        assert!(
1255            events.iter().any(|e| {
1256                matches!(
1257                    e,
1258                    StreamEvent::PolicyDecision {
1259                        decision,
1260                        tool_name,
1261                        ..
1262                    } if decision == "warn" && tool_name == "enforcement"
1263                )
1264            }),
1265            "expected enforcement warning event"
1266        );
1267    }
1268
1269    // =========================================================================
1270    // SpawnMode Builder Tests
1271    // =========================================================================
1272
1273    #[test]
1274    fn test_kernel_with_spawn_mode_inline() {
1275        let tenant = TenantContext::new(TenantId::new());
1276        let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Inline);
1277
1278        assert!(kernel.spawn_mode().is_some());
1279        assert_eq!(*kernel.spawn_mode().unwrap(), SpawnMode::Inline);
1280    }
1281
1282    #[test]
1283    fn test_kernel_with_spawn_mode_child() {
1284        let tenant = TenantContext::new(TenantId::new());
1285        let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
1286            background: true,
1287            inherit_inbox: true,
1288            policies: None,
1289        });
1290
1291        assert!(kernel.spawn_mode().is_some());
1292        if let Some(SpawnMode::Child {
1293            background,
1294            inherit_inbox,
1295            ..
1296        }) = kernel.spawn_mode()
1297        {
1298            assert!(*background);
1299            assert!(*inherit_inbox);
1300        } else {
1301            panic!("Expected SpawnMode::Child");
1302        }
1303    }
1304
1305    #[test]
1306    fn test_kernel_with_parent_execution_id() {
1307        let tenant = TenantContext::new(TenantId::new());
1308        let parent_id = ExecutionId::from_string("exec_parent_123");
1309        let kernel = ExecutionKernel::new(tenant).with_parent_execution_id(parent_id.clone());
1310
1311        assert!(kernel.parent_execution_id().is_some());
1312        assert_eq!(*kernel.parent_execution_id().unwrap(), parent_id);
1313    }
1314
1315    #[test]
1316    fn test_kernel_default_no_spawn_mode() {
1317        let tenant = TenantContext::new(TenantId::new());
1318        let kernel = ExecutionKernel::new(tenant);
1319
1320        assert!(kernel.spawn_mode().is_none());
1321        assert!(kernel.parent_execution_id().is_none());
1322    }
1323
1324    // =========================================================================
1325    // Inbox Routing by SpawnMode Tests
1326    // =========================================================================
1327
1328    #[test]
1329    fn test_get_inbox_execution_ids_no_spawn_mode() {
1330        let tenant = TenantContext::new(TenantId::new());
1331        let kernel = ExecutionKernel::new(tenant);
1332
1333        let ids = kernel.get_inbox_execution_ids();
1334        assert_eq!(ids.len(), 1, "Should return only current execution ID");
1335        assert_eq!(ids[0], *kernel.execution_id());
1336    }
1337
1338    #[test]
1339    fn test_get_inbox_execution_ids_inline_mode() {
1340        let tenant = TenantContext::new(TenantId::new());
1341        let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Inline);
1342
1343        let ids = kernel.get_inbox_execution_ids();
1344        assert_eq!(
1345            ids.len(),
1346            1,
1347            "Inline mode should check current execution only"
1348        );
1349        assert_eq!(ids[0], *kernel.execution_id());
1350    }
1351
1352    #[test]
1353    fn test_get_inbox_execution_ids_child_isolated() {
1354        let tenant = TenantContext::new(TenantId::new());
1355        let parent_id = ExecutionId::from_string("exec_parent");
1356        let kernel = ExecutionKernel::new(tenant)
1357            .with_spawn_mode(SpawnMode::Child {
1358                background: false,
1359                inherit_inbox: false,
1360                policies: None,
1361            })
1362            .with_parent_execution_id(parent_id);
1363
1364        let ids = kernel.get_inbox_execution_ids();
1365        assert_eq!(
1366            ids.len(),
1367            1,
1368            "Child with inherit_inbox=false should be isolated"
1369        );
1370        assert_eq!(
1371            ids[0],
1372            *kernel.execution_id(),
1373            "Should only check own inbox"
1374        );
1375    }
1376
1377    #[test]
1378    fn test_get_inbox_execution_ids_child_inherit() {
1379        let tenant = TenantContext::new(TenantId::new());
1380        let parent_id = ExecutionId::from_string("exec_parent_inherit");
1381        let kernel = ExecutionKernel::new(tenant)
1382            .with_spawn_mode(SpawnMode::Child {
1383                background: false,
1384                inherit_inbox: true,
1385                policies: None,
1386            })
1387            .with_parent_execution_id(parent_id.clone());
1388
1389        let ids = kernel.get_inbox_execution_ids();
1390        assert_eq!(
1391            ids.len(),
1392            2,
1393            "Child with inherit_inbox=true should check both inboxes"
1394        );
1395        assert!(
1396            ids.contains(kernel.execution_id()),
1397            "Should include own execution ID"
1398        );
1399        assert!(
1400            ids.contains(&parent_id),
1401            "Should include parent execution ID"
1402        );
1403    }
1404
1405    #[test]
1406    fn test_get_inbox_execution_ids_child_inherit_no_parent_id() {
1407        let tenant = TenantContext::new(TenantId::new());
1408        // Child with inherit_inbox=true but no parent_execution_id set
1409        let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
1410            background: false,
1411            inherit_inbox: true,
1412            policies: None,
1413        });
1414
1415        let ids = kernel.get_inbox_execution_ids();
1416        // Should gracefully handle missing parent_execution_id
1417        assert_eq!(
1418            ids.len(),
1419            1,
1420            "Without parent_execution_id, should only return own ID"
1421        );
1422        assert_eq!(ids[0], *kernel.execution_id());
1423    }
1424
1425    #[test]
1426    fn test_get_inbox_execution_ids_child_background_isolated() {
1427        let tenant = TenantContext::new(TenantId::new());
1428        let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
1429            background: true,     // Background child
1430            inherit_inbox: false, // Isolated
1431            policies: None,
1432        });
1433
1434        let ids = kernel.get_inbox_execution_ids();
1435        assert_eq!(ids.len(), 1, "Background child with isolated inbox");
1436    }
1437
1438    #[test]
1439    fn test_get_inbox_execution_ids_child_background_inherit() {
1440        let tenant = TenantContext::new(TenantId::new());
1441        let parent_id = ExecutionId::from_string("exec_background_parent");
1442        let kernel = ExecutionKernel::new(tenant)
1443            .with_spawn_mode(SpawnMode::Child {
1444                background: true,    // Background child
1445                inherit_inbox: true, // Inherits parent inbox
1446                policies: None,
1447            })
1448            .with_parent_execution_id(parent_id.clone());
1449
1450        let ids = kernel.get_inbox_execution_ids();
1451        assert_eq!(ids.len(), 2, "Background child can still inherit inbox");
1452        assert!(ids.contains(&parent_id));
1453    }
1454}