use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::kernel::ids::{ExecutionId, SpawnMode, TenantId};
use crate::kernel::ExecutionError;
use super::target_binding::TargetBindingConfig;
use super::trigger::{RetryConfig, TriggerId};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum BackgroundExecutionMode {
#[default]
FireAndForget,
Silent,
Deferred,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum BackgroundExecutionStatus {
#[default]
Queued,
Running,
Completed,
Failed,
Cancelled,
Timeout,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BackgroundExecutionConfig {
#[serde(default)]
pub mode: BackgroundExecutionMode,
#[serde(default = "default_priority")]
pub priority: u8,
#[serde(default = "default_timeout_ms")]
pub timeout_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub target_binding: Option<TargetBindingConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub callback_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<RetryConfig>,
}
fn default_priority() -> u8 {
50
}
fn default_timeout_ms() -> u64 {
300000 }
impl Default for BackgroundExecutionConfig {
fn default() -> Self {
Self {
mode: BackgroundExecutionMode::FireAndForget,
priority: default_priority(),
timeout_ms: default_timeout_ms(),
target_binding: None,
callback_url: None,
retry: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BackgroundExecution {
pub execution_id: ExecutionId,
pub tenant_id: TenantId,
pub callable_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub input: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<HashMap<String, String>>,
pub config: BackgroundExecutionConfig,
#[serde(default)]
pub status: BackgroundExecutionStatus,
pub queued_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completed_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ExecutionError>,
#[serde(default)]
pub target_binding_applied: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub trigger_id: Option<TriggerId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, serde_json::Value>>,
}
impl BackgroundExecution {
pub fn new(
tenant_id: TenantId,
callable_name: impl Into<String>,
config: BackgroundExecutionConfig,
) -> Self {
Self {
execution_id: ExecutionId::new(),
tenant_id,
callable_name: callable_name.into(),
input: None,
context: None,
config,
status: BackgroundExecutionStatus::Queued,
queued_at: Utc::now(),
started_at: None,
completed_at: None,
output: None,
error: None,
target_binding_applied: false,
trigger_id: None,
metadata: None,
}
}
pub fn from_trigger(
tenant_id: TenantId,
trigger_id: TriggerId,
callable_name: impl Into<String>,
input: Option<String>,
context: Option<HashMap<String, String>>,
target_binding: Option<TargetBindingConfig>,
retry: Option<RetryConfig>,
) -> Self {
let config = BackgroundExecutionConfig {
mode: BackgroundExecutionMode::Silent,
target_binding,
retry,
..Default::default()
};
let mut execution = Self::new(tenant_id, callable_name, config);
execution.trigger_id = Some(trigger_id);
execution.input = input;
execution.context = context;
execution
}
pub fn from_spawn_mode(
spawn_mode: &SpawnMode,
tenant_id: TenantId,
callable_name: impl Into<String>,
input: Option<String>,
context: Option<HashMap<String, String>>,
) -> Option<Self> {
match spawn_mode {
SpawnMode::Child {
background: true, ..
} => {
let config = BackgroundExecutionConfig {
mode: BackgroundExecutionMode::FireAndForget,
..Default::default()
};
let mut execution = Self::new(tenant_id, callable_name, config);
execution.input = input;
execution.context = context;
Some(execution)
}
SpawnMode::Child {
background: false, ..
} => {
None
}
SpawnMode::Inline => {
None
}
}
}
pub fn should_run_background(spawn_mode: &SpawnMode) -> bool {
matches!(
spawn_mode,
SpawnMode::Child {
background: true,
..
}
)
}
pub fn start(&mut self) {
self.status = BackgroundExecutionStatus::Running;
self.started_at = Some(Utc::now());
}
pub fn complete(&mut self, output: serde_json::Value) {
self.status = BackgroundExecutionStatus::Completed;
self.completed_at = Some(Utc::now());
self.output = Some(output);
}
pub fn fail(&mut self, error: ExecutionError) {
self.status = BackgroundExecutionStatus::Failed;
self.completed_at = Some(Utc::now());
self.error = Some(error);
}
pub fn cancel(&mut self) {
self.status = BackgroundExecutionStatus::Cancelled;
self.completed_at = Some(Utc::now());
}
pub fn timeout(&mut self) {
self.status = BackgroundExecutionStatus::Timeout;
self.completed_at = Some(Utc::now());
}
pub fn is_finished(&self) -> bool {
matches!(
self.status,
BackgroundExecutionStatus::Completed
| BackgroundExecutionStatus::Failed
| BackgroundExecutionStatus::Cancelled
| BackgroundExecutionStatus::Timeout
)
}
pub fn is_success(&self) -> bool {
self.status == BackgroundExecutionStatus::Completed
}
pub fn duration_ms(&self) -> Option<i64> {
match (self.started_at, self.completed_at) {
(Some(start), Some(end)) => Some((end - start).num_milliseconds()),
_ => None,
}
}
pub fn is_silent(&self) -> bool {
matches!(
self.config.mode,
BackgroundExecutionMode::Silent | BackgroundExecutionMode::FireAndForget
)
}
pub fn requires_result(&self) -> bool {
matches!(self.config.mode, BackgroundExecutionMode::Silent)
}
}
#[derive(Debug, Default)]
pub struct BackgroundExecutionQueue {
executions: std::collections::VecDeque<BackgroundExecution>,
}
impl BackgroundExecutionQueue {
pub fn new() -> Self {
Self::default()
}
pub fn enqueue(&mut self, execution: BackgroundExecution) {
let pos = self
.executions
.iter()
.position(|e| e.config.priority < execution.config.priority)
.unwrap_or(self.executions.len());
self.executions.insert(pos, execution);
}
pub fn dequeue(&mut self) -> Option<BackgroundExecution> {
self.executions.pop_front()
}
pub fn peek(&self) -> Option<&BackgroundExecution> {
self.executions.front()
}
pub fn len(&self) -> usize {
self.executions.len()
}
pub fn is_empty(&self) -> bool {
self.executions.is_empty()
}
pub fn execution_ids(&self) -> Vec<ExecutionId> {
self.executions
.iter()
.map(|e| e.execution_id.clone())
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_background_execution_modes() {
let config = BackgroundExecutionConfig {
mode: BackgroundExecutionMode::FireAndForget,
..Default::default()
};
let exec = BackgroundExecution::new(TenantId::new(), "test", config);
assert!(exec.is_silent());
assert!(!exec.requires_result());
let config = BackgroundExecutionConfig {
mode: BackgroundExecutionMode::Silent,
..Default::default()
};
let exec = BackgroundExecution::new(TenantId::new(), "test", config);
assert!(exec.is_silent());
assert!(exec.requires_result());
let config = BackgroundExecutionConfig {
mode: BackgroundExecutionMode::Deferred,
..Default::default()
};
let exec = BackgroundExecution::new(TenantId::new(), "test", config);
assert!(!exec.is_silent());
assert!(!exec.requires_result());
}
#[test]
fn test_background_execution_lifecycle() {
let config = BackgroundExecutionConfig::default();
let mut exec = BackgroundExecution::new(TenantId::new(), "test", config);
assert_eq!(exec.status, BackgroundExecutionStatus::Queued);
assert!(!exec.is_finished());
exec.start();
assert_eq!(exec.status, BackgroundExecutionStatus::Running);
assert!(exec.started_at.is_some());
exec.complete(serde_json::json!({"result": "success"}));
assert_eq!(exec.status, BackgroundExecutionStatus::Completed);
assert!(exec.is_finished());
assert!(exec.is_success());
assert!(exec.output.is_some());
assert!(exec.duration_ms().is_some());
}
#[test]
fn test_background_execution_queue() {
let mut queue = BackgroundExecutionQueue::new();
assert!(queue.is_empty());
let low = BackgroundExecution::new(
TenantId::new(),
"low",
BackgroundExecutionConfig {
priority: 10,
..Default::default()
},
);
queue.enqueue(low);
let high = BackgroundExecution::new(
TenantId::new(),
"high",
BackgroundExecutionConfig {
priority: 90,
..Default::default()
},
);
queue.enqueue(high);
assert_eq!(queue.len(), 2);
let first = queue.dequeue().unwrap();
assert_eq!(first.callable_name, "high");
let second = queue.dequeue().unwrap();
assert_eq!(second.callable_name, "low");
assert!(queue.is_empty());
}
#[test]
fn test_spawn_mode_integration() {
let tenant_id = TenantId::new();
let spawn_mode = SpawnMode::Child {
background: true,
inherit_inbox: false,
policies: None,
};
assert!(BackgroundExecution::should_run_background(&spawn_mode));
let exec = BackgroundExecution::from_spawn_mode(
&spawn_mode,
tenant_id.clone(),
"background_callable",
Some("input data".to_string()),
None,
);
assert!(exec.is_some());
let exec = exec.unwrap();
assert_eq!(exec.callable_name, "background_callable");
assert_eq!(exec.config.mode, BackgroundExecutionMode::FireAndForget);
assert!(exec.is_silent());
let spawn_mode = SpawnMode::Child {
background: false,
inherit_inbox: false,
policies: None,
};
assert!(!BackgroundExecution::should_run_background(&spawn_mode));
let exec = BackgroundExecution::from_spawn_mode(
&spawn_mode,
tenant_id.clone(),
"sync_callable",
None,
None,
);
assert!(exec.is_none());
let spawn_mode = SpawnMode::Inline;
assert!(!BackgroundExecution::should_run_background(&spawn_mode));
let exec = BackgroundExecution::from_spawn_mode(
&spawn_mode,
tenant_id,
"inline_callable",
None,
None,
);
assert!(exec.is_none());
}
}