1use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14use crate::kernel::ids::{ExecutionId, SpawnMode, TenantId};
15use crate::kernel::ExecutionError;
16
17use super::target_binding::TargetBindingConfig;
18use super::trigger::{RetryConfig, TriggerId};
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
23#[serde(rename_all = "snake_case")]
24pub enum BackgroundExecutionMode {
25 #[default]
27 FireAndForget,
28 Silent,
30 Deferred,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
37#[serde(rename_all = "snake_case")]
38pub enum BackgroundExecutionStatus {
39 #[default]
41 Queued,
42 Running,
44 Completed,
46 Failed,
48 Cancelled,
50 Timeout,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "camelCase")]
58pub struct BackgroundExecutionConfig {
59 #[serde(default)]
61 pub mode: BackgroundExecutionMode,
62
63 #[serde(default = "default_priority")]
65 pub priority: u8,
66
67 #[serde(default = "default_timeout_ms")]
69 pub timeout_ms: u64,
70
71 #[serde(skip_serializing_if = "Option::is_none")]
73 pub target_binding: Option<TargetBindingConfig>,
74
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub callback_url: Option<String>,
78
79 #[serde(skip_serializing_if = "Option::is_none")]
81 pub retry: Option<RetryConfig>,
82}
83
84fn default_priority() -> u8 {
85 50
86}
87
88fn default_timeout_ms() -> u64 {
89 300000 }
91
92impl Default for BackgroundExecutionConfig {
93 fn default() -> Self {
94 Self {
95 mode: BackgroundExecutionMode::FireAndForget,
96 priority: default_priority(),
97 timeout_ms: default_timeout_ms(),
98 target_binding: None,
99 callback_url: None,
100 retry: None,
101 }
102 }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")]
109pub struct BackgroundExecution {
110 pub execution_id: ExecutionId,
112
113 pub tenant_id: TenantId,
115
116 pub callable_name: String,
118
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub input: Option<String>,
122
123 #[serde(skip_serializing_if = "Option::is_none")]
125 pub context: Option<HashMap<String, String>>,
126
127 pub config: BackgroundExecutionConfig,
129
130 #[serde(default)]
132 pub status: BackgroundExecutionStatus,
133
134 pub queued_at: DateTime<Utc>,
136
137 #[serde(skip_serializing_if = "Option::is_none")]
139 pub started_at: Option<DateTime<Utc>>,
140
141 #[serde(skip_serializing_if = "Option::is_none")]
143 pub completed_at: Option<DateTime<Utc>>,
144
145 #[serde(skip_serializing_if = "Option::is_none")]
147 pub output: Option<serde_json::Value>,
148
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub error: Option<ExecutionError>,
152
153 #[serde(default)]
155 pub target_binding_applied: bool,
156
157 #[serde(skip_serializing_if = "Option::is_none")]
159 pub trigger_id: Option<TriggerId>,
160
161 #[serde(skip_serializing_if = "Option::is_none")]
163 pub metadata: Option<HashMap<String, serde_json::Value>>,
164}
165
166impl BackgroundExecution {
167 pub fn new(
169 tenant_id: TenantId,
170 callable_name: impl Into<String>,
171 config: BackgroundExecutionConfig,
172 ) -> Self {
173 Self {
174 execution_id: ExecutionId::new(),
175 tenant_id,
176 callable_name: callable_name.into(),
177 input: None,
178 context: None,
179 config,
180 status: BackgroundExecutionStatus::Queued,
181 queued_at: Utc::now(),
182 started_at: None,
183 completed_at: None,
184 output: None,
185 error: None,
186 target_binding_applied: false,
187 trigger_id: None,
188 metadata: None,
189 }
190 }
191
192 pub fn from_trigger(
194 tenant_id: TenantId,
195 trigger_id: TriggerId,
196 callable_name: impl Into<String>,
197 input: Option<String>,
198 context: Option<HashMap<String, String>>,
199 target_binding: Option<TargetBindingConfig>,
200 retry: Option<RetryConfig>,
201 ) -> Self {
202 let config = BackgroundExecutionConfig {
203 mode: BackgroundExecutionMode::Silent,
204 target_binding,
205 retry,
206 ..Default::default()
207 };
208
209 let mut execution = Self::new(tenant_id, callable_name, config);
210 execution.trigger_id = Some(trigger_id);
211 execution.input = input;
212 execution.context = context;
213 execution
214 }
215
216 pub fn from_spawn_mode(
223 spawn_mode: &SpawnMode,
224 tenant_id: TenantId,
225 callable_name: impl Into<String>,
226 input: Option<String>,
227 context: Option<HashMap<String, String>>,
228 ) -> Option<Self> {
229 match spawn_mode {
230 SpawnMode::Child {
231 background: true, ..
232 } => {
233 let config = BackgroundExecutionConfig {
235 mode: BackgroundExecutionMode::FireAndForget,
236 ..Default::default()
237 };
238
239 let mut execution = Self::new(tenant_id, callable_name, config);
240 execution.input = input;
241 execution.context = context;
242 Some(execution)
243 }
244 SpawnMode::Child {
245 background: false, ..
246 } => {
247 None
249 }
250 SpawnMode::Inline => {
251 None
253 }
254 }
255 }
256
257 pub fn should_run_background(spawn_mode: &SpawnMode) -> bool {
259 matches!(
260 spawn_mode,
261 SpawnMode::Child {
262 background: true,
263 ..
264 }
265 )
266 }
267
268 pub fn start(&mut self) {
270 self.status = BackgroundExecutionStatus::Running;
271 self.started_at = Some(Utc::now());
272 }
273
274 pub fn complete(&mut self, output: serde_json::Value) {
276 self.status = BackgroundExecutionStatus::Completed;
277 self.completed_at = Some(Utc::now());
278 self.output = Some(output);
279 }
280
281 pub fn fail(&mut self, error: ExecutionError) {
283 self.status = BackgroundExecutionStatus::Failed;
284 self.completed_at = Some(Utc::now());
285 self.error = Some(error);
286 }
287
288 pub fn cancel(&mut self) {
290 self.status = BackgroundExecutionStatus::Cancelled;
291 self.completed_at = Some(Utc::now());
292 }
293
294 pub fn timeout(&mut self) {
296 self.status = BackgroundExecutionStatus::Timeout;
297 self.completed_at = Some(Utc::now());
298 }
299
300 pub fn is_finished(&self) -> bool {
302 matches!(
303 self.status,
304 BackgroundExecutionStatus::Completed
305 | BackgroundExecutionStatus::Failed
306 | BackgroundExecutionStatus::Cancelled
307 | BackgroundExecutionStatus::Timeout
308 )
309 }
310
311 pub fn is_success(&self) -> bool {
313 self.status == BackgroundExecutionStatus::Completed
314 }
315
316 pub fn duration_ms(&self) -> Option<i64> {
318 match (self.started_at, self.completed_at) {
319 (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
320 _ => None,
321 }
322 }
323
324 pub fn is_silent(&self) -> bool {
326 matches!(
327 self.config.mode,
328 BackgroundExecutionMode::Silent | BackgroundExecutionMode::FireAndForget
329 )
330 }
331
332 pub fn requires_result(&self) -> bool {
334 matches!(self.config.mode, BackgroundExecutionMode::Silent)
335 }
336}
337
338#[derive(Debug, Default)]
341pub struct BackgroundExecutionQueue {
342 executions: std::collections::VecDeque<BackgroundExecution>,
343}
344
345impl BackgroundExecutionQueue {
346 pub fn new() -> Self {
348 Self::default()
349 }
350
351 pub fn enqueue(&mut self, execution: BackgroundExecution) {
353 let pos = self
355 .executions
356 .iter()
357 .position(|e| e.config.priority < execution.config.priority)
358 .unwrap_or(self.executions.len());
359 self.executions.insert(pos, execution);
360 }
361
362 pub fn dequeue(&mut self) -> Option<BackgroundExecution> {
364 self.executions.pop_front()
365 }
366
367 pub fn peek(&self) -> Option<&BackgroundExecution> {
369 self.executions.front()
370 }
371
372 pub fn len(&self) -> usize {
374 self.executions.len()
375 }
376
377 pub fn is_empty(&self) -> bool {
379 self.executions.is_empty()
380 }
381
382 pub fn execution_ids(&self) -> Vec<ExecutionId> {
384 self.executions
385 .iter()
386 .map(|e| e.execution_id.clone())
387 .collect()
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
396 fn test_background_execution_modes() {
397 let config = BackgroundExecutionConfig {
399 mode: BackgroundExecutionMode::FireAndForget,
400 ..Default::default()
401 };
402 let exec = BackgroundExecution::new(TenantId::new(), "test", config);
403 assert!(exec.is_silent());
404 assert!(!exec.requires_result());
405
406 let config = BackgroundExecutionConfig {
408 mode: BackgroundExecutionMode::Silent,
409 ..Default::default()
410 };
411 let exec = BackgroundExecution::new(TenantId::new(), "test", config);
412 assert!(exec.is_silent());
413 assert!(exec.requires_result());
414
415 let config = BackgroundExecutionConfig {
417 mode: BackgroundExecutionMode::Deferred,
418 ..Default::default()
419 };
420 let exec = BackgroundExecution::new(TenantId::new(), "test", config);
421 assert!(!exec.is_silent());
422 assert!(!exec.requires_result());
423 }
424
425 #[test]
426 fn test_background_execution_lifecycle() {
427 let config = BackgroundExecutionConfig::default();
428 let mut exec = BackgroundExecution::new(TenantId::new(), "test", config);
429
430 assert_eq!(exec.status, BackgroundExecutionStatus::Queued);
431 assert!(!exec.is_finished());
432
433 exec.start();
434 assert_eq!(exec.status, BackgroundExecutionStatus::Running);
435 assert!(exec.started_at.is_some());
436
437 exec.complete(serde_json::json!({"result": "success"}));
438 assert_eq!(exec.status, BackgroundExecutionStatus::Completed);
439 assert!(exec.is_finished());
440 assert!(exec.is_success());
441 assert!(exec.output.is_some());
442 assert!(exec.duration_ms().is_some());
443 }
444
445 #[test]
446 fn test_background_execution_queue() {
447 let mut queue = BackgroundExecutionQueue::new();
448 assert!(queue.is_empty());
449
450 let low = BackgroundExecution::new(
452 TenantId::new(),
453 "low",
454 BackgroundExecutionConfig {
455 priority: 10,
456 ..Default::default()
457 },
458 );
459 queue.enqueue(low);
460
461 let high = BackgroundExecution::new(
463 TenantId::new(),
464 "high",
465 BackgroundExecutionConfig {
466 priority: 90,
467 ..Default::default()
468 },
469 );
470 queue.enqueue(high);
471
472 assert_eq!(queue.len(), 2);
473
474 let first = queue.dequeue().unwrap();
476 assert_eq!(first.callable_name, "high");
477
478 let second = queue.dequeue().unwrap();
479 assert_eq!(second.callable_name, "low");
480
481 assert!(queue.is_empty());
482 }
483
484 #[test]
485 fn test_spawn_mode_integration() {
486 let tenant_id = TenantId::new();
487
488 let spawn_mode = SpawnMode::Child {
490 background: true,
491 inherit_inbox: false,
492 policies: None,
493 };
494 assert!(BackgroundExecution::should_run_background(&spawn_mode));
495
496 let exec = BackgroundExecution::from_spawn_mode(
497 &spawn_mode,
498 tenant_id.clone(),
499 "background_callable",
500 Some("input data".to_string()),
501 None,
502 );
503 assert!(exec.is_some());
504 let exec = exec.unwrap();
505 assert_eq!(exec.callable_name, "background_callable");
506 assert_eq!(exec.config.mode, BackgroundExecutionMode::FireAndForget);
507 assert!(exec.is_silent());
508
509 let spawn_mode = SpawnMode::Child {
511 background: false,
512 inherit_inbox: false,
513 policies: None,
514 };
515 assert!(!BackgroundExecution::should_run_background(&spawn_mode));
516 let exec = BackgroundExecution::from_spawn_mode(
517 &spawn_mode,
518 tenant_id.clone(),
519 "sync_callable",
520 None,
521 None,
522 );
523 assert!(exec.is_none());
524
525 let spawn_mode = SpawnMode::Inline;
527 assert!(!BackgroundExecution::should_run_background(&spawn_mode));
528 let exec = BackgroundExecution::from_spawn_mode(
529 &spawn_mode,
530 tenant_id,
531 "inline_callable",
532 None,
533 None,
534 );
535 assert!(exec.is_none());
536 }
537}