Skip to main content

enact_core/background/
executor.rs

1//! Background Executor - Runs callables in background mode
2//!
3//! The executor handles different background execution modes:
4//! - FireAndForget: Don't wait for result, no streaming
5//! - Silent: Wait for result, but suppress streaming events
6//! - Deferred: Queue for later execution
7//!
8//! @see packages/enact-schemas/src/execution.schemas.ts
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14use crate::kernel::ids::{ExecutionId, SpawnMode, TenantId};
15use crate::kernel::ExecutionError;
16
17use super::target_binding::TargetBindingConfig;
18use super::trigger::{RetryConfig, TriggerId};
19
20/// BackgroundExecutionMode - How to run a background callable
21/// @see packages/enact-schemas/src/execution.schemas.ts - backgroundExecutionModeSchema
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
23#[serde(rename_all = "snake_case")]
24pub enum BackgroundExecutionMode {
25    /// Don't wait for result, no streaming
26    #[default]
27    FireAndForget,
28    /// Wait for result, but no streaming
29    Silent,
30    /// Queue for later execution
31    Deferred,
32}
33
34/// BackgroundExecutionStatus - Status of a background execution
35/// @see packages/enact-schemas/src/execution.schemas.ts - backgroundExecutionStatusSchema
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
37#[serde(rename_all = "snake_case")]
38pub enum BackgroundExecutionStatus {
39    /// Waiting to execute
40    #[default]
41    Queued,
42    /// Currently executing
43    Running,
44    /// Finished successfully
45    Completed,
46    /// Failed with error
47    Failed,
48    /// Cancelled before completion
49    Cancelled,
50    /// Exceeded timeout
51    Timeout,
52}
53
54/// BackgroundExecutionConfig - Configuration for background execution
55/// @see packages/enact-schemas/src/execution.schemas.ts - backgroundExecutionConfigSchema
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "camelCase")]
58pub struct BackgroundExecutionConfig {
59    /// Execution mode
60    #[serde(default)]
61    pub mode: BackgroundExecutionMode,
62
63    /// Priority (higher = runs sooner)
64    #[serde(default = "default_priority")]
65    pub priority: u8,
66
67    /// Maximum execution time in milliseconds
68    #[serde(default = "default_timeout_ms")]
69    pub timeout_ms: u64,
70
71    /// Target binding for result
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub target_binding: Option<TargetBindingConfig>,
74
75    /// Callback URL (for deferred mode)
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub callback_url: Option<String>,
78
79    /// Retry configuration
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub retry: Option<RetryConfig>,
82}
83
84fn default_priority() -> u8 {
85    50
86}
87
88fn default_timeout_ms() -> u64 {
89    300000 // 5 minutes
90}
91
92impl Default for BackgroundExecutionConfig {
93    fn default() -> Self {
94        Self {
95            mode: BackgroundExecutionMode::FireAndForget,
96            priority: default_priority(),
97            timeout_ms: default_timeout_ms(),
98            target_binding: None,
99            callback_url: None,
100            retry: None,
101        }
102    }
103}
104
105/// BackgroundExecution - A background execution record
106/// @see packages/enact-schemas/src/execution.schemas.ts - backgroundExecutionSchema
107#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")]
109pub struct BackgroundExecution {
110    /// Execution ID
111    pub execution_id: ExecutionId,
112
113    /// Tenant that owns this execution
114    pub tenant_id: TenantId,
115
116    /// Callable to invoke
117    pub callable_name: String,
118
119    /// Input to pass to the callable
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub input: Option<String>,
122
123    /// Context to pass
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub context: Option<HashMap<String, String>>,
126
127    /// Configuration
128    pub config: BackgroundExecutionConfig,
129
130    /// Current status
131    #[serde(default)]
132    pub status: BackgroundExecutionStatus,
133
134    /// When execution was queued
135    pub queued_at: DateTime<Utc>,
136
137    /// When execution started
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub started_at: Option<DateTime<Utc>>,
140
141    /// When execution completed
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub completed_at: Option<DateTime<Utc>>,
144
145    /// Output (populated on completion)
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub output: Option<serde_json::Value>,
148
149    /// Error (populated on failure)
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub error: Option<ExecutionError>,
152
153    /// Whether target binding was applied
154    #[serde(default)]
155    pub target_binding_applied: bool,
156
157    /// Trigger that spawned this execution (if any)
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub trigger_id: Option<TriggerId>,
160
161    /// Metadata
162    #[serde(skip_serializing_if = "Option::is_none")]
163    pub metadata: Option<HashMap<String, serde_json::Value>>,
164}
165
166impl BackgroundExecution {
167    /// Create a new background execution
168    pub fn new(
169        tenant_id: TenantId,
170        callable_name: impl Into<String>,
171        config: BackgroundExecutionConfig,
172    ) -> Self {
173        Self {
174            execution_id: ExecutionId::new(),
175            tenant_id,
176            callable_name: callable_name.into(),
177            input: None,
178            context: None,
179            config,
180            status: BackgroundExecutionStatus::Queued,
181            queued_at: Utc::now(),
182            started_at: None,
183            completed_at: None,
184            output: None,
185            error: None,
186            target_binding_applied: false,
187            trigger_id: None,
188            metadata: None,
189        }
190    }
191
192    /// Create from a trigger
193    pub fn from_trigger(
194        tenant_id: TenantId,
195        trigger_id: TriggerId,
196        callable_name: impl Into<String>,
197        input: Option<String>,
198        context: Option<HashMap<String, String>>,
199        target_binding: Option<TargetBindingConfig>,
200        retry: Option<RetryConfig>,
201    ) -> Self {
202        let config = BackgroundExecutionConfig {
203            mode: BackgroundExecutionMode::Silent,
204            target_binding,
205            retry,
206            ..Default::default()
207        };
208
209        let mut execution = Self::new(tenant_id, callable_name, config);
210        execution.trigger_id = Some(trigger_id);
211        execution.input = input;
212        execution.context = context;
213        execution
214    }
215
216    /// Create from SpawnMode::Child
217    ///
218    /// This bridges the gap between SpawnMode::Child { background: true, .. }
219    /// and the BackgroundExecution infrastructure.
220    ///
221    /// @see docs/TECHNICAL/32-SPAWN-MODE.md
222    pub fn from_spawn_mode(
223        spawn_mode: &SpawnMode,
224        tenant_id: TenantId,
225        callable_name: impl Into<String>,
226        input: Option<String>,
227        context: Option<HashMap<String, String>>,
228    ) -> Option<Self> {
229        match spawn_mode {
230            SpawnMode::Child {
231                background: true, ..
232            } => {
233                // Background=true means fire-and-forget execution
234                let config = BackgroundExecutionConfig {
235                    mode: BackgroundExecutionMode::FireAndForget,
236                    ..Default::default()
237                };
238
239                let mut execution = Self::new(tenant_id, callable_name, config);
240                execution.input = input;
241                execution.context = context;
242                Some(execution)
243            }
244            SpawnMode::Child {
245                background: false, ..
246            } => {
247                // Background=false means inline execution - not a background execution
248                None
249            }
250            SpawnMode::Inline => {
251                // Inline mode doesn't create a separate execution
252                None
253            }
254        }
255    }
256
257    /// Check if a SpawnMode should create a background execution
258    pub fn should_run_background(spawn_mode: &SpawnMode) -> bool {
259        matches!(
260            spawn_mode,
261            SpawnMode::Child {
262                background: true,
263                ..
264            }
265        )
266    }
267
268    /// Mark execution as started
269    pub fn start(&mut self) {
270        self.status = BackgroundExecutionStatus::Running;
271        self.started_at = Some(Utc::now());
272    }
273
274    /// Mark execution as completed
275    pub fn complete(&mut self, output: serde_json::Value) {
276        self.status = BackgroundExecutionStatus::Completed;
277        self.completed_at = Some(Utc::now());
278        self.output = Some(output);
279    }
280
281    /// Mark execution as failed
282    pub fn fail(&mut self, error: ExecutionError) {
283        self.status = BackgroundExecutionStatus::Failed;
284        self.completed_at = Some(Utc::now());
285        self.error = Some(error);
286    }
287
288    /// Mark execution as cancelled
289    pub fn cancel(&mut self) {
290        self.status = BackgroundExecutionStatus::Cancelled;
291        self.completed_at = Some(Utc::now());
292    }
293
294    /// Mark execution as timed out
295    pub fn timeout(&mut self) {
296        self.status = BackgroundExecutionStatus::Timeout;
297        self.completed_at = Some(Utc::now());
298    }
299
300    /// Check if execution has completed (successfully or not)
301    pub fn is_finished(&self) -> bool {
302        matches!(
303            self.status,
304            BackgroundExecutionStatus::Completed
305                | BackgroundExecutionStatus::Failed
306                | BackgroundExecutionStatus::Cancelled
307                | BackgroundExecutionStatus::Timeout
308        )
309    }
310
311    /// Check if execution succeeded
312    pub fn is_success(&self) -> bool {
313        self.status == BackgroundExecutionStatus::Completed
314    }
315
316    /// Calculate execution duration
317    pub fn duration_ms(&self) -> Option<i64> {
318        match (self.started_at, self.completed_at) {
319            (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
320            _ => None,
321        }
322    }
323
324    /// Check if execution should suppress streaming events (silent mode)
325    pub fn is_silent(&self) -> bool {
326        matches!(
327            self.config.mode,
328            BackgroundExecutionMode::Silent | BackgroundExecutionMode::FireAndForget
329        )
330    }
331
332    /// Check if execution requires waiting for result
333    pub fn requires_result(&self) -> bool {
334        matches!(self.config.mode, BackgroundExecutionMode::Silent)
335    }
336}
337
338/// BackgroundExecutionQueue - Queue for managing background executions
339/// This is an in-memory implementation for testing. Production uses Redis/Postgres.
340#[derive(Debug, Default)]
341pub struct BackgroundExecutionQueue {
342    executions: std::collections::VecDeque<BackgroundExecution>,
343}
344
345impl BackgroundExecutionQueue {
346    /// Create a new empty queue
347    pub fn new() -> Self {
348        Self::default()
349    }
350
351    /// Enqueue a new execution
352    pub fn enqueue(&mut self, execution: BackgroundExecution) {
353        // Insert based on priority (higher priority first)
354        let pos = self
355            .executions
356            .iter()
357            .position(|e| e.config.priority < execution.config.priority)
358            .unwrap_or(self.executions.len());
359        self.executions.insert(pos, execution);
360    }
361
362    /// Dequeue the next execution to run
363    pub fn dequeue(&mut self) -> Option<BackgroundExecution> {
364        self.executions.pop_front()
365    }
366
367    /// Peek at the next execution without removing it
368    pub fn peek(&self) -> Option<&BackgroundExecution> {
369        self.executions.front()
370    }
371
372    /// Get queue length
373    pub fn len(&self) -> usize {
374        self.executions.len()
375    }
376
377    /// Check if queue is empty
378    pub fn is_empty(&self) -> bool {
379        self.executions.is_empty()
380    }
381
382    /// Get all queued execution IDs
383    pub fn execution_ids(&self) -> Vec<ExecutionId> {
384        self.executions
385            .iter()
386            .map(|e| e.execution_id.clone())
387            .collect()
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[test]
396    fn test_background_execution_modes() {
397        // Test fire and forget
398        let config = BackgroundExecutionConfig {
399            mode: BackgroundExecutionMode::FireAndForget,
400            ..Default::default()
401        };
402        let exec = BackgroundExecution::new(TenantId::new(), "test", config);
403        assert!(exec.is_silent());
404        assert!(!exec.requires_result());
405
406        // Test silent
407        let config = BackgroundExecutionConfig {
408            mode: BackgroundExecutionMode::Silent,
409            ..Default::default()
410        };
411        let exec = BackgroundExecution::new(TenantId::new(), "test", config);
412        assert!(exec.is_silent());
413        assert!(exec.requires_result());
414
415        // Test deferred
416        let config = BackgroundExecutionConfig {
417            mode: BackgroundExecutionMode::Deferred,
418            ..Default::default()
419        };
420        let exec = BackgroundExecution::new(TenantId::new(), "test", config);
421        assert!(!exec.is_silent());
422        assert!(!exec.requires_result());
423    }
424
425    #[test]
426    fn test_background_execution_lifecycle() {
427        let config = BackgroundExecutionConfig::default();
428        let mut exec = BackgroundExecution::new(TenantId::new(), "test", config);
429
430        assert_eq!(exec.status, BackgroundExecutionStatus::Queued);
431        assert!(!exec.is_finished());
432
433        exec.start();
434        assert_eq!(exec.status, BackgroundExecutionStatus::Running);
435        assert!(exec.started_at.is_some());
436
437        exec.complete(serde_json::json!({"result": "success"}));
438        assert_eq!(exec.status, BackgroundExecutionStatus::Completed);
439        assert!(exec.is_finished());
440        assert!(exec.is_success());
441        assert!(exec.output.is_some());
442        assert!(exec.duration_ms().is_some());
443    }
444
445    #[test]
446    fn test_background_execution_queue() {
447        let mut queue = BackgroundExecutionQueue::new();
448        assert!(queue.is_empty());
449
450        // Add low priority
451        let low = BackgroundExecution::new(
452            TenantId::new(),
453            "low",
454            BackgroundExecutionConfig {
455                priority: 10,
456                ..Default::default()
457            },
458        );
459        queue.enqueue(low);
460
461        // Add high priority
462        let high = BackgroundExecution::new(
463            TenantId::new(),
464            "high",
465            BackgroundExecutionConfig {
466                priority: 90,
467                ..Default::default()
468            },
469        );
470        queue.enqueue(high);
471
472        assert_eq!(queue.len(), 2);
473
474        // High priority should come first
475        let first = queue.dequeue().unwrap();
476        assert_eq!(first.callable_name, "high");
477
478        let second = queue.dequeue().unwrap();
479        assert_eq!(second.callable_name, "low");
480
481        assert!(queue.is_empty());
482    }
483
484    #[test]
485    fn test_spawn_mode_integration() {
486        let tenant_id = TenantId::new();
487
488        // SpawnMode::Child with background=true creates a background execution
489        let spawn_mode = SpawnMode::Child {
490            background: true,
491            inherit_inbox: false,
492            policies: None,
493        };
494        assert!(BackgroundExecution::should_run_background(&spawn_mode));
495
496        let exec = BackgroundExecution::from_spawn_mode(
497            &spawn_mode,
498            tenant_id.clone(),
499            "background_callable",
500            Some("input data".to_string()),
501            None,
502        );
503        assert!(exec.is_some());
504        let exec = exec.unwrap();
505        assert_eq!(exec.callable_name, "background_callable");
506        assert_eq!(exec.config.mode, BackgroundExecutionMode::FireAndForget);
507        assert!(exec.is_silent());
508
509        // SpawnMode::Child with background=false does not create a background execution
510        let spawn_mode = SpawnMode::Child {
511            background: false,
512            inherit_inbox: false,
513            policies: None,
514        };
515        assert!(!BackgroundExecution::should_run_background(&spawn_mode));
516        let exec = BackgroundExecution::from_spawn_mode(
517            &spawn_mode,
518            tenant_id.clone(),
519            "sync_callable",
520            None,
521            None,
522        );
523        assert!(exec.is_none());
524
525        // SpawnMode::Inline does not create a background execution
526        let spawn_mode = SpawnMode::Inline;
527        assert!(!BackgroundExecution::should_run_background(&spawn_mode));
528        let exec = BackgroundExecution::from_spawn_mode(
529            &spawn_mode,
530            tenant_id,
531            "inline_callable",
532            None,
533            None,
534        );
535        assert!(exec.is_none());
536    }
537}