use super::artifact::{ArtifactStore, ArtifactType, PutArtifactRequest};
use super::enforcement::{
EnforcementMiddleware, EnforcementResult, ExecutionUsage, LongRunningExecutionPolicy,
};
use super::error::ExecutionError;
use super::execution_model::Execution;
use super::execution_state::{ExecutionState, StepState, WaitReason};
use super::ids::{ArtifactId, ExecutionId, SpawnMode, StepId, StepType};
use super::persistence::{ExecutionSnapshot, StateStore};
use super::reducer::{reduce, ExecutionAction, ReducerError};
use crate::context::TenantContext;
use crate::graph::{CompiledGraph, NodeState};
use crate::inbox::{ControlAction, InboxMessage, InboxStore};
use crate::signal::SignalBus;
use crate::streaming::{EventEmitter, ProtectedEventEmitter, StreamEvent};
use std::sync::Arc;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
enum InboxAction {
Continue,
Pause,
Cancel(String),
}
pub struct ExecutionKernel {
execution: Execution,
tenant_context: TenantContext,
emitter: EventEmitter,
protected_emitter: Option<ProtectedEventEmitter>,
cancellation_token: CancellationToken,
inbox: Option<Arc<dyn InboxStore>>,
state_store: Option<Arc<dyn StateStore>>,
signal_bus: Option<Arc<dyn SignalBus>>,
artifact_store: Option<Arc<dyn ArtifactStore>>,
enforcement: Arc<EnforcementMiddleware>,
long_running_policy: LongRunningExecutionPolicy,
usage: Option<Arc<ExecutionUsage>>,
spawn_mode: Option<SpawnMode>,
parent_execution_id: Option<ExecutionId>,
}
impl ExecutionKernel {
pub fn new(tenant_context: TenantContext) -> Self {
let mut execution = Execution::new();
execution.tenant_id = Some(tenant_context.tenant_id.clone());
Self {
execution,
tenant_context,
emitter: EventEmitter::new(),
protected_emitter: None,
cancellation_token: CancellationToken::new(),
inbox: None,
state_store: None,
signal_bus: None,
artifact_store: None,
enforcement: Arc::new(EnforcementMiddleware::new()),
long_running_policy: LongRunningExecutionPolicy::standard(),
usage: None,
spawn_mode: None,
parent_execution_id: None,
}
}
pub fn with_execution(execution: Execution, tenant_context: TenantContext) -> Self {
Self {
execution,
tenant_context,
emitter: EventEmitter::new(),
protected_emitter: None,
cancellation_token: CancellationToken::new(),
inbox: None,
state_store: None,
signal_bus: None,
artifact_store: None,
enforcement: Arc::new(EnforcementMiddleware::new()),
long_running_policy: LongRunningExecutionPolicy::standard(),
usage: None,
spawn_mode: None,
parent_execution_id: None,
}
}
pub fn with_protected_emitter(mut self, emitter: ProtectedEventEmitter) -> Self {
self.protected_emitter = Some(emitter);
self
}
pub fn with_inbox(mut self, inbox: Arc<dyn InboxStore>) -> Self {
self.inbox = Some(inbox);
self
}
pub fn inbox(&self) -> Option<&Arc<dyn InboxStore>> {
self.inbox.as_ref()
}
pub fn with_state_store(mut self, state_store: Arc<dyn StateStore>) -> Self {
self.state_store = Some(state_store);
self
}
pub fn state_store(&self) -> Option<&Arc<dyn StateStore>> {
self.state_store.as_ref()
}
pub fn with_signal_bus(mut self, signal_bus: Arc<dyn SignalBus>) -> Self {
self.signal_bus = Some(signal_bus);
self
}
pub fn signal_bus(&self) -> Option<&Arc<dyn SignalBus>> {
self.signal_bus.as_ref()
}
pub fn with_artifact_store(mut self, store: Arc<dyn ArtifactStore>) -> Self {
self.artifact_store = Some(store);
self
}
pub fn artifact_store(&self) -> Option<&Arc<dyn ArtifactStore>> {
self.artifact_store.as_ref()
}
pub fn with_enforcement(mut self, enforcement: Arc<EnforcementMiddleware>) -> Self {
self.enforcement = enforcement;
self
}
pub fn enforcement(&self) -> &Arc<EnforcementMiddleware> {
&self.enforcement
}
pub fn with_long_running_policy(mut self, policy: LongRunningExecutionPolicy) -> Self {
self.long_running_policy = policy;
self
}
pub fn long_running_policy(&self) -> &LongRunningExecutionPolicy {
&self.long_running_policy
}
pub fn with_spawn_mode(mut self, spawn_mode: SpawnMode) -> Self {
self.spawn_mode = Some(spawn_mode);
self
}
pub fn spawn_mode(&self) -> Option<&SpawnMode> {
self.spawn_mode.as_ref()
}
pub fn with_parent_execution_id(mut self, parent_id: ExecutionId) -> Self {
self.parent_execution_id = Some(parent_id);
self
}
pub fn parent_execution_id(&self) -> Option<&ExecutionId> {
self.parent_execution_id.as_ref()
}
pub fn usage_snapshot(&self) -> Option<super::enforcement::UsageSnapshot> {
self.usage
.as_ref()
.map(|u| super::enforcement::UsageSnapshot::from(u.as_ref()))
}
pub async fn register_for_enforcement(&mut self) -> Arc<ExecutionUsage> {
let usage = self
.enforcement
.register_execution(
self.execution.id.clone(),
self.tenant_context.tenant_id.clone(),
)
.await;
self.usage = Some(Arc::clone(&usage));
usage
}
pub async fn unregister_from_enforcement(&self) {
self.enforcement
.unregister_execution(&self.execution.id)
.await;
}
pub async fn check_limits_before_step(&self) -> Result<(), ExecutionError> {
let basic_result = self
.enforcement
.check_all_limits(&self.execution.id, &self.tenant_context.limits)
.await;
if let EnforcementResult::Blocked(violation) = basic_result {
return Err(violation.to_error());
}
let long_running_result = self
.enforcement
.check_long_running_limits(&self.execution.id, &self.long_running_policy)
.await;
if let EnforcementResult::Blocked(violation) = long_running_result {
return Err(violation.to_error());
}
if self.enforcement.emit_warning_events_enabled() {
if let Some(warning) = match (&basic_result, &long_running_result) {
(EnforcementResult::Warning(w), _) => Some(w),
(_, EnforcementResult::Warning(w)) => Some(w),
_ => None,
} {
tracing::warn!(
execution_id = %self.execution.id,
warning_type = ?warning.warning_type,
usage_percent = warning.usage_percent,
message = %warning.message,
"Enforcement warning"
);
self.emitter.emit(StreamEvent::policy_decision_warn(
&self.execution.id,
None,
"enforcement",
warning.message.clone(),
));
}
}
Ok(())
}
pub async fn record_step_completed(&self) {
self.enforcement.record_step(&self.execution.id).await;
}
pub async fn record_token_usage(&self, input_tokens: u32, output_tokens: u32) {
self.enforcement
.record_tokens(&self.execution.id, input_tokens, output_tokens)
.await;
}
pub async fn record_cost(&self, cost_usd: f64) {
self.enforcement
.record_cost(&self.execution.id, cost_usd)
.await;
}
pub async fn record_discovered_step(&self) {
self.enforcement
.record_discovered_step(&self.execution.id)
.await;
}
pub async fn push_discovery_depth(&self) {
self.enforcement
.push_discovery_depth(&self.execution.id)
.await;
}
pub async fn pop_discovery_depth(&self) {
self.enforcement
.pop_discovery_depth(&self.execution.id)
.await;
}
pub async fn store_artifact(
&self,
step_id: &StepId,
name: impl Into<String>,
artifact_type: ArtifactType,
content: Vec<u8>,
) -> Option<ArtifactId> {
let store = self.artifact_store.as_ref()?;
let request = PutArtifactRequest::new(
self.execution.id.clone(),
step_id.clone(),
name,
artifact_type,
content,
);
match store.put(request).await {
Ok(response) => {
let artifact_type_str = format!("{:?}", artifact_type);
self.emitter.emit(StreamEvent::artifact_created(
&self.execution.id,
step_id,
&response.artifact_id,
artifact_type_str,
));
tracing::debug!(
execution_id = %self.execution.id,
step_id = %step_id,
artifact_id = %response.artifact_id,
"Artifact stored"
);
Some(response.artifact_id)
}
Err(e) => {
tracing::warn!(
execution_id = %self.execution.id,
step_id = %step_id,
error = %e,
"Failed to store artifact"
);
None
}
}
}
pub async fn store_text_artifact(
&self,
step_id: &StepId,
name: impl Into<String>,
content: impl Into<String>,
) -> Option<ArtifactId> {
self.store_artifact(
step_id,
name,
ArtifactType::Text,
content.into().into_bytes(),
)
.await
}
pub async fn store_json_artifact(
&self,
step_id: &StepId,
name: impl Into<String>,
value: &serde_json::Value,
) -> Option<ArtifactId> {
let content = serde_json::to_vec_pretty(value).ok()?;
self.store_artifact(step_id, name, ArtifactType::Json, content)
.await
}
pub fn tenant_context(&self) -> &TenantContext {
&self.tenant_context
}
pub fn execution_id(&self) -> &ExecutionId {
&self.execution.id
}
pub fn state(&self) -> ExecutionState {
self.execution.state
}
pub fn emitter(&self) -> &EventEmitter {
&self.emitter
}
pub fn execution(&self) -> &Execution {
&self.execution
}
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub fn cancel(&self, _reason: impl Into<String>) {
self.cancellation_token.cancel();
}
pub fn child_cancellation_token(&self) -> CancellationToken {
self.cancellation_token.child_token()
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
pub fn dispatch(&mut self, action: ExecutionAction) -> Result<(), ReducerError> {
reduce(&mut self.execution, action.clone())?;
self.emit_event_for_action(&action);
self.persist_snapshot_best_effort();
self.emit_signal_best_effort(&action);
Ok(())
}
pub fn start(&mut self) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Start)
}
pub fn begin_step(
&mut self,
step_type: StepType,
name: impl Into<String>,
parent_step_id: Option<StepId>,
) -> Result<StepId, ReducerError> {
let step_id = StepId::new();
self.dispatch(ExecutionAction::StepStarted {
step_id: step_id.clone(),
parent_step_id,
step_type,
name: name.into(),
source: None,
})?;
Ok(step_id)
}
pub fn complete_step(
&mut self,
step_id: StepId,
output: Option<String>,
duration_ms: u64,
) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::StepCompleted {
step_id,
output,
duration_ms,
})
}
pub fn fail_step(
&mut self,
step_id: StepId,
error: ExecutionError,
) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::StepFailed { step_id, error })
}
pub fn fail_step_with_message(
&mut self,
step_id: StepId,
message: impl Into<String>,
) -> Result<(), ReducerError> {
self.fail_step(step_id, ExecutionError::kernel_internal(message))
}
pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Pause {
reason: reason.into(),
})
}
pub fn resume(&mut self) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Resume)
}
pub fn wait_for(&mut self, reason: WaitReason) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Wait { reason })
}
pub fn input_received(&mut self) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::InputReceived)
}
pub fn complete(&mut self, output: Option<String>) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Complete { output })
}
pub fn fail(&mut self, error: ExecutionError) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Fail { error })
}
pub fn fail_with_message(&mut self, message: impl Into<String>) -> Result<(), ReducerError> {
self.fail(ExecutionError::kernel_internal(message))
}
pub fn cancel_execution(&mut self, reason: impl Into<String>) -> Result<(), ReducerError> {
self.dispatch(ExecutionAction::Cancel {
reason: reason.into(),
})
}
pub async fn execute_graph(
&mut self,
graph: &CompiledGraph,
input: &str,
) -> anyhow::Result<NodeState> {
self.start()?;
let mut state = NodeState::from_string(input);
let mut current_node = graph.entry_point().to_string();
let cancel_token = self.cancellation_token.clone();
loop {
if self.is_cancelled() {
self.cancel_execution("Cancelled by user")?;
anyhow::bail!("Execution cancelled");
}
let node = graph
.get_node(¤t_node)
.ok_or_else(|| anyhow::anyhow!("Node '{}' not found", current_node))?;
let step_start = Instant::now();
let step_id = self.begin_step(StepType::FunctionNode, current_node.clone(), None)?;
let node_future = node.execute(state.clone());
let result = tokio::select! {
biased;
_ = cancel_token.cancelled() => {
let error = ExecutionError::kernel_internal("Cancelled during step execution")
.with_step_id(step_id.clone());
self.fail_step(step_id, error)?;
self.cancel_execution("Cancelled during step execution")?;
anyhow::bail!("Execution cancelled during step");
}
result = node_future => result,
};
let duration_ms = step_start.elapsed().as_millis() as u64;
match result {
Ok(new_state) => {
state = new_state;
self.complete_step(
step_id,
Some(state.as_str().unwrap_or_default().to_string()),
duration_ms,
)?;
}
Err(e) => {
let error = ExecutionError::kernel_internal(e.to_string())
.with_step_id(step_id.clone());
self.fail_step(step_id.clone(), error.clone())?;
self.fail(error)?;
return Err(e);
}
}
if let Some(action) = self.check_inbox()? {
match action {
InboxAction::Pause => {
tracing::info!(
execution_id = %self.execution.id,
"Execution paused via inbox, continuing in paused state"
);
}
InboxAction::Cancel(reason) => {
self.cancel_execution(&reason)?;
anyhow::bail!("Execution cancelled via inbox: {}", reason);
}
InboxAction::Continue => {
}
}
}
let output = state.as_str().unwrap_or_default();
let next = graph.get_next(¤t_node, output);
if next.is_empty() {
break;
}
match &next[0] {
crate::graph::EdgeTarget::End => break,
crate::graph::EdgeTarget::Node(n) => {
current_node = n.clone();
}
}
}
self.complete(Some(state.as_str().unwrap_or_default().to_string()))?;
Ok(state)
}
fn check_inbox(&mut self) -> Result<Option<InboxAction>, ReducerError> {
let inbox = match &self.inbox {
Some(inbox) => inbox.clone(),
None => return Ok(None),
};
let execution_ids_to_check = self.get_inbox_execution_ids();
let has_messages = execution_ids_to_check
.iter()
.any(|id| inbox.has_control_messages(id) || !inbox.is_empty(id));
if !has_messages {
return Ok(None);
}
let messages: Vec<InboxMessage> = execution_ids_to_check
.iter()
.flat_map(|id| inbox.drain_messages(id))
.collect();
let mut action = InboxAction::Continue;
for message in messages {
self.emit_inbox_event(&message);
match message {
InboxMessage::Control(ctrl) => {
match ctrl.action {
ControlAction::Pause => {
tracing::info!(
execution_id = %self.execution.id,
actor = %ctrl.actor,
reason = ?ctrl.reason,
"Inbox: Pause requested"
);
self.pause(
ctrl.reason
.unwrap_or_else(|| "Paused via inbox".to_string()),
)?;
action = InboxAction::Pause;
}
ControlAction::Resume => {
tracing::info!(
execution_id = %self.execution.id,
actor = %ctrl.actor,
"Inbox: Resume requested"
);
self.resume()?;
action = InboxAction::Continue;
}
ControlAction::Cancel => {
let reason = ctrl
.reason
.unwrap_or_else(|| "Cancelled via inbox".to_string());
tracing::info!(
execution_id = %self.execution.id,
actor = %ctrl.actor,
reason = %reason,
"Inbox: Cancel requested"
);
return Ok(Some(InboxAction::Cancel(reason)));
}
ControlAction::Checkpoint => {
tracing::info!(
execution_id = %self.execution.id,
"Inbox: Checkpoint requested"
);
}
ControlAction::Compact => {
tracing::info!(
execution_id = %self.execution.id,
"Inbox: Compact requested"
);
}
}
}
InboxMessage::Guidance(guidance) => {
tracing::info!(
execution_id = %self.execution.id,
from = ?guidance.from,
priority = ?guidance.priority,
content = %guidance.content,
"Inbox: Guidance received"
);
}
InboxMessage::Evidence(evidence) => {
tracing::info!(
execution_id = %self.execution.id,
source = ?evidence.source,
impact = ?evidence.impact,
title = %evidence.title,
"Inbox: Evidence received"
);
}
InboxMessage::A2a(a2a) => {
tracing::debug!(
execution_id = %self.execution.id,
from_agent = %a2a.from_agent,
message_type = %a2a.message_type,
"Inbox: A2A message received"
);
}
}
}
Ok(Some(action))
}
fn emit_inbox_event(&self, message: &InboxMessage) {
let event =
StreamEvent::inbox_message(&self.execution.id, message.id(), message.message_type());
self.emitter.emit(event);
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) fn get_inbox_execution_ids(&self) -> Vec<ExecutionId> {
match &self.spawn_mode {
Some(SpawnMode::Inline) => {
vec![self.execution.id.clone()]
}
Some(SpawnMode::Child {
inherit_inbox: true,
..
}) => {
let mut ids = vec![self.execution.id.clone()];
if let Some(parent_id) = &self.parent_execution_id {
ids.push(parent_id.clone());
tracing::debug!(
execution_id = %self.execution.id,
parent_id = %parent_id,
"Checking both parent and own inbox (inherit_inbox=true)"
);
}
ids
}
Some(SpawnMode::Child {
inherit_inbox: false,
..
}) => {
tracing::debug!(
execution_id = %self.execution.id,
"Using isolated inbox (inherit_inbox=false)"
);
vec![self.execution.id.clone()]
}
None => {
vec![self.execution.id.clone()]
}
}
}
fn emit_event_for_action(&self, action: &ExecutionAction) {
let event = match action {
ExecutionAction::Start => StreamEvent::execution_start(&self.execution.id),
ExecutionAction::StepStarted {
step_id,
step_type,
name,
..
} => StreamEvent::step_start(
&self.execution.id,
step_id,
step_type.clone(),
name.clone(),
),
ExecutionAction::StepCompleted {
step_id,
output,
duration_ms,
} => StreamEvent::step_end(&self.execution.id, step_id, output.clone(), *duration_ms),
ExecutionAction::StepFailed { step_id, error } => {
StreamEvent::step_failed(&self.execution.id, step_id, error.clone())
}
ExecutionAction::Pause { reason } => {
StreamEvent::execution_paused(&self.execution.id, reason.clone())
}
ExecutionAction::Resume => StreamEvent::execution_resumed(&self.execution.id),
ExecutionAction::Complete { output } => {
let duration = self.execution.duration_ms().unwrap_or(0);
StreamEvent::execution_end(&self.execution.id, output.clone(), duration)
}
ExecutionAction::Fail { error } => {
StreamEvent::execution_failed(&self.execution.id, error.clone())
}
ExecutionAction::Cancel { reason } => {
StreamEvent::execution_cancelled(&self.execution.id, reason.clone())
}
ExecutionAction::Wait { .. } | ExecutionAction::InputReceived => {
return;
}
};
self.emitter.emit(event);
}
fn persist_snapshot_best_effort(&self) {
let Some(store) = self.state_store.as_ref() else {
return;
};
let current_step_id = self.execution.step_order.iter().rev().find_map(|id| {
self.execution
.steps
.get(id)
.and_then(|s| (s.state == StepState::Running).then_some(id.clone()))
});
let step_outputs = self
.execution
.steps
.iter()
.filter_map(|(step_id, step)| {
step.output
.as_ref()
.map(|output| (step_id.clone(), serde_json::Value::String(output.clone())))
})
.collect();
let mut snapshot = ExecutionSnapshot::with_user(
self.execution.id.clone(),
self.tenant_context.tenant_id.clone(),
self.tenant_context.user_id.clone(),
self.execution.state,
self.execution.step_order.len() as u64,
);
snapshot.current_step_id = current_step_id;
snapshot.step_outputs = step_outputs;
let store = Arc::clone(store);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if let Err(e) = store.save_snapshot(snapshot).await {
tracing::debug!("State snapshot persistence failed: {}", e);
}
});
}
}
fn emit_signal_best_effort(&self, action: &ExecutionAction) {
let Some(bus) = self.signal_bus.as_ref() else {
return;
};
let action_name = match action {
ExecutionAction::Start => "start",
ExecutionAction::StepStarted { .. } => "step_started",
ExecutionAction::StepCompleted { .. } => "step_completed",
ExecutionAction::StepFailed { .. } => "step_failed",
ExecutionAction::Pause { .. } => "paused",
ExecutionAction::Resume => "resumed",
ExecutionAction::Complete { .. } => "completed",
ExecutionAction::Fail { .. } => "failed",
ExecutionAction::Cancel { .. } => "cancelled",
ExecutionAction::Wait { .. } => "waiting",
ExecutionAction::InputReceived => "input_received",
};
let signal = serde_json::json!({
"execution_id": self.execution.id.to_string(),
"tenant_id": self.tenant_context.tenant_id.to_string(),
"action": action_name,
"state": format!("{:?}", self.execution.state),
});
let signal_bytes = match serde_json::to_vec(&signal) {
Ok(bytes) => bytes,
Err(_) => return,
};
let bus = Arc::clone(bus);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if let Err(e) = bus.emit("execution.lifecycle", &signal_bytes).await {
tracing::debug!("Signal emit failed: {}", e);
}
});
}
}
pub async fn emit_protected(&self, event: StreamEvent) -> anyhow::Result<()> {
if let Some(protected) = &self.protected_emitter {
protected.emit(event).await?;
} else {
self.emitter.emit(event);
}
Ok(())
}
pub fn emit_unprotected(&self, event: StreamEvent) {
if let Some(protected) = &self.protected_emitter {
protected.emit_unprotected(event);
} else {
self.emitter.emit(event);
}
}
pub fn has_protected_emitter(&self) -> bool {
self.protected_emitter.is_some()
}
pub fn protected_emitter(&self) -> Option<&ProtectedEventEmitter> {
self.protected_emitter.as_ref()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::ResourceLimits;
use crate::TenantId;
#[tokio::test]
async fn emits_warning_event_when_limits_near_threshold() {
let limits = ResourceLimits {
max_steps: 5,
..Default::default()
};
let tenant = TenantContext::new(TenantId::new()).with_limits(limits);
let mut kernel = ExecutionKernel::new(tenant);
kernel.register_for_enforcement().await;
kernel.record_step_completed().await;
kernel.record_step_completed().await;
kernel.record_step_completed().await;
kernel.check_limits_before_step().await.unwrap();
let events = kernel.emitter.drain();
assert!(
events.iter().any(|e| {
matches!(
e,
StreamEvent::PolicyDecision {
decision,
tool_name,
..
} if decision == "warn" && tool_name == "enforcement"
)
}),
"expected enforcement warning event"
);
}
#[test]
fn test_kernel_with_spawn_mode_inline() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Inline);
assert!(kernel.spawn_mode().is_some());
assert_eq!(*kernel.spawn_mode().unwrap(), SpawnMode::Inline);
}
#[test]
fn test_kernel_with_spawn_mode_child() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
background: true,
inherit_inbox: true,
policies: None,
});
assert!(kernel.spawn_mode().is_some());
if let Some(SpawnMode::Child {
background,
inherit_inbox,
..
}) = kernel.spawn_mode()
{
assert!(*background);
assert!(*inherit_inbox);
} else {
panic!("Expected SpawnMode::Child");
}
}
#[test]
fn test_kernel_with_parent_execution_id() {
let tenant = TenantContext::new(TenantId::new());
let parent_id = ExecutionId::from_string("exec_parent_123");
let kernel = ExecutionKernel::new(tenant).with_parent_execution_id(parent_id.clone());
assert!(kernel.parent_execution_id().is_some());
assert_eq!(*kernel.parent_execution_id().unwrap(), parent_id);
}
#[test]
fn test_kernel_default_no_spawn_mode() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant);
assert!(kernel.spawn_mode().is_none());
assert!(kernel.parent_execution_id().is_none());
}
#[test]
fn test_get_inbox_execution_ids_no_spawn_mode() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant);
let ids = kernel.get_inbox_execution_ids();
assert_eq!(ids.len(), 1, "Should return only current execution ID");
assert_eq!(ids[0], *kernel.execution_id());
}
#[test]
fn test_get_inbox_execution_ids_inline_mode() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Inline);
let ids = kernel.get_inbox_execution_ids();
assert_eq!(
ids.len(),
1,
"Inline mode should check current execution only"
);
assert_eq!(ids[0], *kernel.execution_id());
}
#[test]
fn test_get_inbox_execution_ids_child_isolated() {
let tenant = TenantContext::new(TenantId::new());
let parent_id = ExecutionId::from_string("exec_parent");
let kernel = ExecutionKernel::new(tenant)
.with_spawn_mode(SpawnMode::Child {
background: false,
inherit_inbox: false,
policies: None,
})
.with_parent_execution_id(parent_id);
let ids = kernel.get_inbox_execution_ids();
assert_eq!(
ids.len(),
1,
"Child with inherit_inbox=false should be isolated"
);
assert_eq!(
ids[0],
*kernel.execution_id(),
"Should only check own inbox"
);
}
#[test]
fn test_get_inbox_execution_ids_child_inherit() {
let tenant = TenantContext::new(TenantId::new());
let parent_id = ExecutionId::from_string("exec_parent_inherit");
let kernel = ExecutionKernel::new(tenant)
.with_spawn_mode(SpawnMode::Child {
background: false,
inherit_inbox: true,
policies: None,
})
.with_parent_execution_id(parent_id.clone());
let ids = kernel.get_inbox_execution_ids();
assert_eq!(
ids.len(),
2,
"Child with inherit_inbox=true should check both inboxes"
);
assert!(
ids.contains(kernel.execution_id()),
"Should include own execution ID"
);
assert!(
ids.contains(&parent_id),
"Should include parent execution ID"
);
}
#[test]
fn test_get_inbox_execution_ids_child_inherit_no_parent_id() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
background: false,
inherit_inbox: true,
policies: None,
});
let ids = kernel.get_inbox_execution_ids();
assert_eq!(
ids.len(),
1,
"Without parent_execution_id, should only return own ID"
);
assert_eq!(ids[0], *kernel.execution_id());
}
#[test]
fn test_get_inbox_execution_ids_child_background_isolated() {
let tenant = TenantContext::new(TenantId::new());
let kernel = ExecutionKernel::new(tenant).with_spawn_mode(SpawnMode::Child {
background: true, inherit_inbox: false, policies: None,
});
let ids = kernel.get_inbox_execution_ids();
assert_eq!(ids.len(), 1, "Background child with isolated inbox");
}
#[test]
fn test_get_inbox_execution_ids_child_background_inherit() {
let tenant = TenantContext::new(TenantId::new());
let parent_id = ExecutionId::from_string("exec_background_parent");
let kernel = ExecutionKernel::new(tenant)
.with_spawn_mode(SpawnMode::Child {
background: true, inherit_inbox: true, policies: None,
})
.with_parent_execution_id(parent_id.clone());
let ids = kernel.get_inbox_execution_ids();
assert_eq!(ids.len(), 2, "Background child can still inherit inbox");
assert!(ids.contains(&parent_id));
}
}