Skip to main content

chasm/automation/
mod.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: AGPL-3.0-only
3//! Automation Engine Module
4//!
5//! Provides workflow automation capabilities:
6//! - Rule-based triggers and actions
7//! - Scheduled tasks
8//! - Event-driven workflows
9//! - Conditional logic
10//! - Action chaining
11
12use anyhow::{anyhow, Result};
13use chrono::{DateTime, Datelike, Duration, NaiveTime, Timelike, Utc, Weekday};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18
19// =============================================================================
20// Workflow Definition
21// =============================================================================
22
23/// Workflow definition
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct Workflow {
26    /// Workflow ID
27    pub id: String,
28    /// Workflow name
29    pub name: String,
30    /// Description
31    pub description: Option<String>,
32    /// Whether workflow is enabled
33    pub enabled: bool,
34    /// Trigger conditions
35    pub triggers: Vec<Trigger>,
36    /// Conditions that must be met
37    pub conditions: Vec<Condition>,
38    /// Actions to execute
39    pub actions: Vec<Action>,
40    /// Error handling strategy
41    pub on_error: ErrorStrategy,
42    /// Created timestamp
43    pub created_at: DateTime<Utc>,
44    /// Last modified timestamp
45    pub updated_at: DateTime<Utc>,
46    /// Last executed timestamp
47    pub last_run: Option<DateTime<Utc>>,
48    /// Execution count
49    pub run_count: u64,
50}
51
52// =============================================================================
53// Triggers
54// =============================================================================
55
56/// Trigger that starts a workflow
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum Trigger {
60    /// Event-based trigger
61    Event {
62        /// Event type to listen for
63        event_type: String,
64        /// Optional filter conditions
65        filter: Option<HashMap<String, serde_json::Value>>,
66    },
67    /// Schedule-based trigger
68    Schedule {
69        /// Cron expression
70        cron: String,
71        /// Timezone
72        timezone: Option<String>,
73    },
74    /// Interval-based trigger
75    Interval {
76        /// Interval in seconds
77        seconds: u64,
78    },
79    /// Time of day trigger
80    TimeOfDay {
81        /// Time to trigger
82        time: NaiveTime,
83        /// Days of week (None = every day)
84        days: Option<Vec<Weekday>>,
85    },
86    /// Manual trigger
87    Manual,
88    /// Webhook trigger
89    Webhook {
90        /// Webhook path
91        path: String,
92        /// HTTP methods
93        methods: Vec<String>,
94    },
95    /// File change trigger
96    FileChange {
97        /// Path pattern
98        pattern: String,
99        /// Change types
100        events: Vec<FileChangeEvent>,
101    },
102}
103
104/// File change event types
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum FileChangeEvent {
108    Created,
109    Modified,
110    Deleted,
111    Renamed,
112}
113
114// =============================================================================
115// Conditions
116// =============================================================================
117
118/// Condition that must be met for workflow to execute
119#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(tag = "type", rename_all = "snake_case")]
121pub enum Condition {
122    /// All sub-conditions must be true
123    And { conditions: Vec<Condition> },
124    /// Any sub-condition must be true
125    Or { conditions: Vec<Condition> },
126    /// Negate a condition
127    Not { condition: Box<Condition> },
128    /// Compare a value
129    Compare {
130        /// Left operand (supports variable interpolation)
131        left: String,
132        /// Comparison operator
133        operator: CompareOp,
134        /// Right operand
135        right: serde_json::Value,
136    },
137    /// Check if value exists
138    Exists {
139        /// Path to check
140        path: String,
141    },
142    /// Check if value matches pattern
143    Matches {
144        /// Value to check
145        value: String,
146        /// Regex pattern
147        pattern: String,
148    },
149    /// Time-based condition
150    TimeWindow {
151        /// Start time
152        start: NaiveTime,
153        /// End time
154        end: NaiveTime,
155        /// Days of week (None = every day)
156        days: Option<Vec<Weekday>>,
157    },
158    /// Custom expression
159    Expression {
160        /// Expression string
161        expr: String,
162    },
163}
164
165/// Comparison operator
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum CompareOp {
169    Equals,
170    NotEquals,
171    GreaterThan,
172    GreaterOrEqual,
173    LessThan,
174    LessOrEqual,
175    Contains,
176    StartsWith,
177    EndsWith,
178    In,
179}
180
181// =============================================================================
182// Actions
183// =============================================================================
184
185/// Action to execute in a workflow
186#[derive(Debug, Clone, Serialize, Deserialize)]
187#[serde(tag = "type", rename_all = "snake_case")]
188pub enum Action {
189    /// Export sessions
190    Export {
191        /// Session filter
192        filter: SessionFilter,
193        /// Export format
194        format: String,
195        /// Output path
196        output: String,
197    },
198    /// Archive sessions
199    Archive {
200        /// Session filter
201        filter: SessionFilter,
202        /// Archive location
203        destination: String,
204    },
205    /// Delete sessions
206    Delete {
207        /// Session filter
208        filter: SessionFilter,
209    },
210    /// Tag sessions
211    Tag {
212        /// Session filter
213        filter: SessionFilter,
214        /// Tags to add
215        add_tags: Vec<String>,
216        /// Tags to remove
217        remove_tags: Vec<String>,
218    },
219    /// Sync sessions
220    Sync {
221        /// Provider to sync with
222        provider: String,
223        /// Direction
224        direction: SyncDirection,
225    },
226    /// Run harvest
227    Harvest {
228        /// Provider to harvest from
229        provider: Option<String>,
230    },
231    /// Execute plugin
232    Plugin {
233        /// Plugin ID
234        plugin_id: String,
235        /// Plugin action
236        action: String,
237        /// Action parameters
238        params: HashMap<String, serde_json::Value>,
239    },
240    /// Send notification
241    Notify {
242        /// Notification channel
243        channel: NotificationChannel,
244        /// Message template
245        message: String,
246        /// Message title
247        title: Option<String>,
248    },
249    /// Make HTTP request
250    Http {
251        /// Request URL
252        url: String,
253        /// HTTP method
254        method: String,
255        /// Request headers
256        headers: HashMap<String, String>,
257        /// Request body
258        body: Option<String>,
259    },
260    /// Execute shell command
261    Shell {
262        /// Command to execute
263        command: String,
264        /// Working directory
265        cwd: Option<String>,
266        /// Environment variables
267        env: HashMap<String, String>,
268    },
269    /// Set variable
270    SetVariable {
271        /// Variable name
272        name: String,
273        /// Variable value
274        value: serde_json::Value,
275    },
276    /// Conditional action
277    If {
278        /// Condition
279        condition: Condition,
280        /// Actions if true
281        then: Vec<Action>,
282        /// Actions if false
283        else_: Option<Vec<Action>>,
284    },
285    /// Loop action
286    ForEach {
287        /// Items to iterate
288        items: String,
289        /// Variable name for current item
290        as_var: String,
291        /// Actions to execute for each item
292        actions: Vec<Action>,
293    },
294    /// Delay action
295    Delay {
296        /// Delay in seconds
297        seconds: u64,
298    },
299    /// Log message
300    Log {
301        /// Log level
302        level: LogLevel,
303        /// Message
304        message: String,
305    },
306}
307
308/// Session filter for actions
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SessionFilter {
311    /// Provider filter
312    pub provider: Option<String>,
313    /// Age filter (older than N days)
314    pub older_than_days: Option<u32>,
315    /// Tags filter
316    pub tags: Option<Vec<String>>,
317    /// Query filter
318    pub query: Option<String>,
319}
320
321/// Sync direction
322#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
323#[serde(rename_all = "snake_case")]
324pub enum SyncDirection {
325    Push,
326    Pull,
327    Both,
328}
329
330/// Notification channel
331#[derive(Debug, Clone, Serialize, Deserialize)]
332#[serde(tag = "type", rename_all = "snake_case")]
333pub enum NotificationChannel {
334    /// System notification
335    System,
336    /// Email
337    Email { to: String },
338    /// Slack webhook
339    Slack { webhook_url: String },
340    /// Discord webhook
341    Discord { webhook_url: String },
342    /// Generic webhook
343    Webhook { url: String },
344}
345
346/// Log level
347#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
348#[serde(rename_all = "snake_case")]
349pub enum LogLevel {
350    Debug,
351    Info,
352    Warning,
353    Error,
354}
355
356/// Error handling strategy
357#[derive(Debug, Clone, Serialize, Deserialize)]
358#[serde(rename_all = "snake_case")]
359pub enum ErrorStrategy {
360    /// Stop workflow on error
361    Stop,
362    /// Continue to next action
363    Continue,
364    /// Retry N times
365    Retry {
366        max_attempts: u32,
367        delay_seconds: u64,
368    },
369    /// Execute fallback actions
370    Fallback { actions: Vec<Action> },
371}
372
373// =============================================================================
374// Execution Context
375// =============================================================================
376
377/// Workflow execution context
378#[derive(Debug, Clone)]
379pub struct ExecutionContext {
380    /// Workflow ID
381    pub workflow_id: String,
382    /// Run ID
383    pub run_id: String,
384    /// Trigger event
385    pub trigger_event: Option<serde_json::Value>,
386    /// Variables
387    pub variables: HashMap<String, serde_json::Value>,
388    /// Start time
389    pub started_at: DateTime<Utc>,
390    /// Action results
391    pub results: Vec<ActionResult>,
392}
393
394impl ExecutionContext {
395    pub fn new(workflow_id: String, trigger_event: Option<serde_json::Value>) -> Self {
396        Self {
397            workflow_id,
398            run_id: uuid::Uuid::new_v4().to_string(),
399            trigger_event,
400            variables: HashMap::new(),
401            started_at: Utc::now(),
402            results: Vec::new(),
403        }
404    }
405
406    /// Get variable value
407    pub fn get_var(&self, name: &str) -> Option<&serde_json::Value> {
408        self.variables.get(name)
409    }
410
411    /// Set variable value
412    pub fn set_var(&mut self, name: String, value: serde_json::Value) {
413        self.variables.insert(name, value);
414    }
415
416    /// Interpolate variables in a string
417    pub fn interpolate(&self, template: &str) -> String {
418        let mut result = template.to_string();
419
420        for (key, value) in &self.variables {
421            let placeholder = format!("{{{{{}}}}}", key);
422            let replacement = match value {
423                serde_json::Value::String(s) => s.clone(),
424                other => other.to_string(),
425            };
426            result = result.replace(&placeholder, &replacement);
427        }
428
429        result
430    }
431}
432
433/// Result of an action execution
434#[derive(Debug, Clone, Serialize, Deserialize)]
435pub struct ActionResult {
436    /// Action index
437    pub action_index: usize,
438    /// Success status
439    pub success: bool,
440    /// Result data
441    pub data: Option<serde_json::Value>,
442    /// Error message
443    pub error: Option<String>,
444    /// Execution time (ms)
445    pub duration_ms: u64,
446    /// Timestamp
447    pub executed_at: DateTime<Utc>,
448}
449
450// =============================================================================
451// Automation Engine
452// =============================================================================
453
454/// Workflow run status
455#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
456#[serde(rename_all = "snake_case")]
457pub enum RunStatus {
458    Running,
459    Completed,
460    Failed,
461    Cancelled,
462}
463
464/// Workflow run record
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct WorkflowRun {
467    /// Run ID
468    pub id: String,
469    /// Workflow ID
470    pub workflow_id: String,
471    /// Status
472    pub status: RunStatus,
473    /// Trigger that started the run
474    pub trigger: String,
475    /// Start time
476    pub started_at: DateTime<Utc>,
477    /// End time
478    pub ended_at: Option<DateTime<Utc>>,
479    /// Action results
480    pub results: Vec<ActionResult>,
481    /// Error message if failed
482    pub error: Option<String>,
483}
484
485/// Automation engine
486pub struct AutomationEngine {
487    /// Registered workflows
488    workflows: Arc<RwLock<HashMap<String, Workflow>>>,
489    /// Run history
490    runs: Arc<RwLock<Vec<WorkflowRun>>>,
491    /// Max history size
492    max_history: usize,
493}
494
495impl AutomationEngine {
496    pub fn new(max_history: usize) -> Self {
497        Self {
498            workflows: Arc::new(RwLock::new(HashMap::new())),
499            runs: Arc::new(RwLock::new(Vec::new())),
500            max_history,
501        }
502    }
503
504    /// Register a workflow
505    pub async fn register(&self, workflow: Workflow) -> Result<()> {
506        self.validate_workflow(&workflow)?;
507        self.workflows
508            .write()
509            .await
510            .insert(workflow.id.clone(), workflow);
511        Ok(())
512    }
513
514    /// Validate workflow definition
515    fn validate_workflow(&self, workflow: &Workflow) -> Result<()> {
516        if workflow.id.is_empty() {
517            return Err(anyhow!("Workflow ID cannot be empty"));
518        }
519        if workflow.triggers.is_empty() {
520            return Err(anyhow!("Workflow must have at least one trigger"));
521        }
522        if workflow.actions.is_empty() {
523            return Err(anyhow!("Workflow must have at least one action"));
524        }
525        Ok(())
526    }
527
528    /// Unregister a workflow
529    pub async fn unregister(&self, workflow_id: &str) -> Result<()> {
530        self.workflows
531            .write()
532            .await
533            .remove(workflow_id)
534            .ok_or_else(|| anyhow!("Workflow not found: {}", workflow_id))?;
535        Ok(())
536    }
537
538    /// Get workflow
539    pub async fn get_workflow(&self, workflow_id: &str) -> Option<Workflow> {
540        self.workflows.read().await.get(workflow_id).cloned()
541    }
542
543    /// List all workflows
544    pub async fn list_workflows(&self) -> Vec<Workflow> {
545        self.workflows.read().await.values().cloned().collect()
546    }
547
548    /// Trigger a workflow manually
549    pub async fn trigger(
550        &self,
551        workflow_id: &str,
552        event: Option<serde_json::Value>,
553    ) -> Result<String> {
554        let workflow = self
555            .workflows
556            .read()
557            .await
558            .get(workflow_id)
559            .cloned()
560            .ok_or_else(|| anyhow!("Workflow not found: {}", workflow_id))?;
561
562        if !workflow.enabled {
563            return Err(anyhow!("Workflow is disabled"));
564        }
565
566        self.execute_workflow(&workflow, event).await
567    }
568
569    /// Execute a workflow
570    async fn execute_workflow(
571        &self,
572        workflow: &Workflow,
573        event: Option<serde_json::Value>,
574    ) -> Result<String> {
575        let mut ctx = ExecutionContext::new(workflow.id.clone(), event);
576
577        // Check conditions
578        for condition in &workflow.conditions {
579            if !self.evaluate_condition(condition, &ctx).await? {
580                return Err(anyhow!("Workflow conditions not met"));
581            }
582        }
583
584        let run = WorkflowRun {
585            id: ctx.run_id.clone(),
586            workflow_id: workflow.id.clone(),
587            status: RunStatus::Running,
588            trigger: "manual".to_string(),
589            started_at: ctx.started_at,
590            ended_at: None,
591            results: Vec::new(),
592            error: None,
593        };
594
595        self.record_run(run.clone()).await;
596
597        // Execute actions
598        let mut final_status = RunStatus::Completed;
599        let mut final_error = None;
600
601        for (i, action) in workflow.actions.iter().enumerate() {
602            let start = std::time::Instant::now();
603
604            match self.execute_action(action, &mut ctx).await {
605                Ok(data) => {
606                    ctx.results.push(ActionResult {
607                        action_index: i,
608                        success: true,
609                        data,
610                        error: None,
611                        duration_ms: start.elapsed().as_millis() as u64,
612                        executed_at: Utc::now(),
613                    });
614                }
615                Err(e) => {
616                    ctx.results.push(ActionResult {
617                        action_index: i,
618                        success: false,
619                        data: None,
620                        error: Some(e.to_string()),
621                        duration_ms: start.elapsed().as_millis() as u64,
622                        executed_at: Utc::now(),
623                    });
624
625                    match &workflow.on_error {
626                        ErrorStrategy::Stop => {
627                            final_status = RunStatus::Failed;
628                            final_error = Some(e.to_string());
629                            break;
630                        }
631                        ErrorStrategy::Continue => continue,
632                        ErrorStrategy::Retry {
633                            max_attempts,
634                            delay_seconds,
635                        } => {
636                            // Simple retry logic
637                            let mut retry_success = false;
638                            for _ in 0..*max_attempts {
639                                tokio::time::sleep(tokio::time::Duration::from_secs(
640                                    *delay_seconds,
641                                ))
642                                .await;
643                                if self.execute_action(action, &mut ctx).await.is_ok() {
644                                    retry_success = true;
645                                    break;
646                                }
647                            }
648                            if !retry_success {
649                                final_status = RunStatus::Failed;
650                                final_error = Some(e.to_string());
651                                break;
652                            }
653                        }
654                        ErrorStrategy::Fallback { actions } => {
655                            for fallback in actions {
656                                self.execute_action(fallback, &mut ctx).await.ok();
657                            }
658                        }
659                    }
660                }
661            }
662        }
663
664        // Update run record
665        self.update_run(&ctx.run_id, final_status, final_error, ctx.results)
666            .await;
667
668        // Update workflow stats
669        self.update_workflow_stats(&workflow.id).await;
670
671        Ok(ctx.run_id)
672    }
673
674    /// Evaluate a condition (boxed for recursion)
675    fn evaluate_condition<'a>(
676        &'a self,
677        condition: &'a Condition,
678        ctx: &'a ExecutionContext,
679    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
680        Box::pin(async move {
681            match condition {
682                Condition::And { conditions } => {
683                    for c in conditions {
684                        if !self.evaluate_condition(c, ctx).await? {
685                            return Ok(false);
686                        }
687                    }
688                    Ok(true)
689                }
690                Condition::Or { conditions } => {
691                    for c in conditions {
692                        if self.evaluate_condition(c, ctx).await? {
693                            return Ok(true);
694                        }
695                    }
696                    Ok(false)
697                }
698                Condition::Not { condition } => {
699                    Ok(!self.evaluate_condition(condition, ctx).await?)
700                }
701                Condition::Compare {
702                    left,
703                    operator,
704                    right,
705                } => {
706                    let left_val = ctx.interpolate(left);
707                    self.compare_values(&left_val, operator, right)
708                }
709                Condition::Exists { path } => Ok(ctx.variables.contains_key(path)),
710                Condition::Matches { value, pattern } => {
711                    let val = ctx.interpolate(value);
712                    let re = regex::Regex::new(pattern)?;
713                    Ok(re.is_match(&val))
714                }
715                Condition::TimeWindow { start, end, days } => {
716                    let now = Utc::now();
717                    let current_time = now.time();
718                    let current_day = now.weekday();
719
720                    // Check day of week
721                    if let Some(valid_days) = days {
722                        if !valid_days.contains(&current_day) {
723                            return Ok(false);
724                        }
725                    }
726
727                    // Check time window
728                    if start <= end {
729                        Ok(current_time >= *start && current_time <= *end)
730                    } else {
731                        // Window spans midnight
732                        Ok(current_time >= *start || current_time <= *end)
733                    }
734                }
735                Condition::Expression { expr: _ } => {
736                    // Would need expression evaluator
737                    Ok(true)
738                }
739            }
740        })
741    }
742
743    fn compare_values(
744        &self,
745        left: &str,
746        op: &CompareOp,
747        right: &serde_json::Value,
748    ) -> Result<bool> {
749        match op {
750            CompareOp::Equals => {
751                if let serde_json::Value::String(s) = right {
752                    Ok(left == s)
753                } else {
754                    Ok(left == right.to_string())
755                }
756            }
757            CompareOp::NotEquals => {
758                if let serde_json::Value::String(s) = right {
759                    Ok(left != s)
760                } else {
761                    Ok(left != right.to_string())
762                }
763            }
764            CompareOp::Contains => {
765                if let serde_json::Value::String(s) = right {
766                    Ok(left.contains(s.as_str()))
767                } else {
768                    Ok(false)
769                }
770            }
771            CompareOp::StartsWith => {
772                if let serde_json::Value::String(s) = right {
773                    Ok(left.starts_with(s.as_str()))
774                } else {
775                    Ok(false)
776                }
777            }
778            CompareOp::EndsWith => {
779                if let serde_json::Value::String(s) = right {
780                    Ok(left.ends_with(s.as_str()))
781                } else {
782                    Ok(false)
783                }
784            }
785            CompareOp::In => {
786                if let serde_json::Value::Array(arr) = right {
787                    for item in arr {
788                        if let serde_json::Value::String(s) = item {
789                            if left == s {
790                                return Ok(true);
791                            }
792                        }
793                    }
794                }
795                Ok(false)
796            }
797            _ => {
798                // Numeric comparisons
799                let left_num: f64 = left.parse()?;
800                let right_num = match right {
801                    serde_json::Value::Number(n) => n.as_f64().unwrap_or(0.0),
802                    _ => right.to_string().parse()?,
803                };
804
805                Ok(match op {
806                    CompareOp::GreaterThan => left_num > right_num,
807                    CompareOp::GreaterOrEqual => left_num >= right_num,
808                    CompareOp::LessThan => left_num < right_num,
809                    CompareOp::LessOrEqual => left_num <= right_num,
810                    _ => false,
811                })
812            }
813        }
814    }
815
816    /// Execute an action (boxed for recursion)
817    fn execute_action<'a>(
818        &'a self,
819        action: &'a Action,
820        ctx: &'a mut ExecutionContext,
821    ) -> std::pin::Pin<
822        Box<dyn std::future::Future<Output = Result<Option<serde_json::Value>>> + Send + 'a>,
823    > {
824        Box::pin(async move {
825            match action {
826                Action::Export {
827                    filter,
828                    format,
829                    output,
830                } => {
831                    // Would call actual export logic
832                    log::info!(
833                        "Exporting sessions: filter={:?}, format={}, output={}",
834                        filter,
835                        format,
836                        output
837                    );
838                    Ok(Some(serde_json::json!({ "exported": true })))
839                }
840                Action::Archive {
841                    filter,
842                    destination,
843                } => {
844                    log::info!(
845                        "Archiving sessions: filter={:?}, destination={}",
846                        filter,
847                        destination
848                    );
849                    Ok(Some(serde_json::json!({ "archived": true })))
850                }
851                Action::Delete { filter } => {
852                    log::info!("Deleting sessions: filter={:?}", filter);
853                    Ok(Some(serde_json::json!({ "deleted": true })))
854                }
855                Action::Tag {
856                    filter,
857                    add_tags,
858                    remove_tags,
859                } => {
860                    log::info!(
861                        "Tagging sessions: filter={:?}, add={:?}, remove={:?}",
862                        filter,
863                        add_tags,
864                        remove_tags
865                    );
866                    Ok(Some(serde_json::json!({ "tagged": true })))
867                }
868                Action::Sync {
869                    provider,
870                    direction,
871                } => {
872                    log::info!(
873                        "Syncing with provider {}: direction={:?}",
874                        provider,
875                        direction
876                    );
877                    Ok(Some(serde_json::json!({ "synced": true })))
878                }
879                Action::Harvest { provider } => {
880                    log::info!("Harvesting from provider: {:?}", provider);
881                    Ok(Some(serde_json::json!({ "harvested": true })))
882                }
883                Action::Plugin {
884                    plugin_id,
885                    action,
886                    params,
887                } => {
888                    log::info!(
889                        "Executing plugin {}: action={}, params={:?}",
890                        plugin_id,
891                        action,
892                        params
893                    );
894                    Ok(Some(serde_json::json!({ "plugin_executed": true })))
895                }
896                Action::Notify {
897                    channel,
898                    message,
899                    title,
900                } => {
901                    let msg = ctx.interpolate(message);
902                    log::info!(
903                        "Sending notification: channel={:?}, title={:?}, message={}",
904                        channel,
905                        title,
906                        msg
907                    );
908                    Ok(Some(serde_json::json!({ "notified": true })))
909                }
910                Action::Http {
911                    url,
912                    method,
913                    headers: _,
914                    body: _,
915                } => {
916                    let url = ctx.interpolate(url);
917                    log::info!("HTTP request: {} {}", method, url);
918                    // Would make actual HTTP request
919                    Ok(Some(serde_json::json!({ "status": 200 })))
920                }
921                Action::Shell { command, cwd, env: _ } => {
922                    let cmd = ctx.interpolate(command);
923                    log::info!("Executing shell: {} (cwd={:?})", cmd, cwd);
924                    // Would execute actual command
925                    Ok(Some(serde_json::json!({ "exit_code": 0 })))
926                }
927                Action::SetVariable { name, value } => {
928                    ctx.set_var(name.clone(), value.clone());
929                    Ok(None)
930                }
931                Action::If {
932                    condition,
933                    then,
934                    else_,
935                } => {
936                    if self.evaluate_condition(condition, ctx).await? {
937                        for action in then {
938                            self.execute_action(action, ctx).await?;
939                        }
940                    } else if let Some(else_actions) = else_ {
941                        for action in else_actions {
942                            self.execute_action(action, ctx).await?;
943                        }
944                    }
945                    Ok(None)
946                }
947                Action::ForEach {
948                    items,
949                    as_var,
950                    actions,
951                } => {
952                    if let Some(arr) = ctx.get_var(items) {
953                        if let serde_json::Value::Array(items_arr) = arr.clone() {
954                            for item in items_arr {
955                                ctx.set_var(as_var.clone(), item);
956                                for action in actions {
957                                    self.execute_action(action, ctx).await?;
958                                }
959                            }
960                        }
961                    }
962                    Ok(None)
963                }
964                Action::Delay { seconds } => {
965                    tokio::time::sleep(tokio::time::Duration::from_secs(*seconds)).await;
966                    Ok(None)
967                }
968                Action::Log { level, message } => {
969                    let msg = ctx.interpolate(message);
970                    match level {
971                        LogLevel::Debug => log::debug!("{}", msg),
972                        LogLevel::Info => log::info!("{}", msg),
973                        LogLevel::Warning => log::warn!("{}", msg),
974                        LogLevel::Error => log::error!("{}", msg),
975                    }
976                    Ok(None)
977                }
978            }
979        })
980    }
981
982    async fn record_run(&self, run: WorkflowRun) {
983        let mut runs = self.runs.write().await;
984        runs.push(run);
985
986        // Trim history
987        if runs.len() > self.max_history {
988            runs.remove(0);
989        }
990    }
991
992    async fn update_run(
993        &self,
994        run_id: &str,
995        status: RunStatus,
996        error: Option<String>,
997        results: Vec<ActionResult>,
998    ) {
999        let mut runs = self.runs.write().await;
1000        if let Some(run) = runs.iter_mut().find(|r| r.id == run_id) {
1001            run.status = status;
1002            run.ended_at = Some(Utc::now());
1003            run.error = error;
1004            run.results = results;
1005        }
1006    }
1007
1008    async fn update_workflow_stats(&self, workflow_id: &str) {
1009        let mut workflows = self.workflows.write().await;
1010        if let Some(workflow) = workflows.get_mut(workflow_id) {
1011            workflow.last_run = Some(Utc::now());
1012            workflow.run_count += 1;
1013        }
1014    }
1015
1016    /// Get run history
1017    pub async fn get_runs(&self, workflow_id: Option<&str>, limit: usize) -> Vec<WorkflowRun> {
1018        let runs = self.runs.read().await;
1019        runs.iter()
1020            .filter(|r| workflow_id.map(|id| r.workflow_id == id).unwrap_or(true))
1021            .rev()
1022            .take(limit)
1023            .cloned()
1024            .collect()
1025    }
1026
1027    /// Get specific run
1028    pub async fn get_run(&self, run_id: &str) -> Option<WorkflowRun> {
1029        self.runs
1030            .read()
1031            .await
1032            .iter()
1033            .find(|r| r.id == run_id)
1034            .cloned()
1035    }
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040    use super::*;
1041
1042    #[tokio::test]
1043    async fn test_automation_engine() {
1044        let engine = AutomationEngine::new(100);
1045
1046        let workflow = Workflow {
1047            id: "test-workflow".to_string(),
1048            name: "Test Workflow".to_string(),
1049            description: None,
1050            enabled: true,
1051            triggers: vec![Trigger::Manual],
1052            conditions: vec![],
1053            actions: vec![Action::Log {
1054                level: LogLevel::Info,
1055                message: "Test action executed".to_string(),
1056            }],
1057            on_error: ErrorStrategy::Stop,
1058            created_at: Utc::now(),
1059            updated_at: Utc::now(),
1060            last_run: None,
1061            run_count: 0,
1062        };
1063
1064        engine.register(workflow).await.unwrap();
1065        let run_id = engine.trigger("test-workflow", None).await.unwrap();
1066
1067        let run = engine.get_run(&run_id).await.unwrap();
1068        assert_eq!(run.status, RunStatus::Completed);
1069    }
1070
1071    #[test]
1072    fn test_interpolation() {
1073        let mut ctx = ExecutionContext::new("test".to_string(), None);
1074        ctx.set_var("name".to_string(), serde_json::json!("World"));
1075
1076        let result = ctx.interpolate("Hello, {{name}}!");
1077        assert_eq!(result, "Hello, World!");
1078    }
1079}