1use anyhow::{anyhow, Result};
13use chrono::{DateTime, Datelike, Duration, NaiveTime, Timelike, Utc, Weekday};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct Workflow {
26 pub id: String,
28 pub name: String,
30 pub description: Option<String>,
32 pub enabled: bool,
34 pub triggers: Vec<Trigger>,
36 pub conditions: Vec<Condition>,
38 pub actions: Vec<Action>,
40 pub on_error: ErrorStrategy,
42 pub created_at: DateTime<Utc>,
44 pub updated_at: DateTime<Utc>,
46 pub last_run: Option<DateTime<Utc>>,
48 pub run_count: u64,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum Trigger {
60 Event {
62 event_type: String,
64 filter: Option<HashMap<String, serde_json::Value>>,
66 },
67 Schedule {
69 cron: String,
71 timezone: Option<String>,
73 },
74 Interval {
76 seconds: u64,
78 },
79 TimeOfDay {
81 time: NaiveTime,
83 days: Option<Vec<Weekday>>,
85 },
86 Manual,
88 Webhook {
90 path: String,
92 methods: Vec<String>,
94 },
95 FileChange {
97 pattern: String,
99 events: Vec<FileChangeEvent>,
101 },
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum FileChangeEvent {
108 Created,
109 Modified,
110 Deleted,
111 Renamed,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(tag = "type", rename_all = "snake_case")]
121pub enum Condition {
122 And { conditions: Vec<Condition> },
124 Or { conditions: Vec<Condition> },
126 Not { condition: Box<Condition> },
128 Compare {
130 left: String,
132 operator: CompareOp,
134 right: serde_json::Value,
136 },
137 Exists {
139 path: String,
141 },
142 Matches {
144 value: String,
146 pattern: String,
148 },
149 TimeWindow {
151 start: NaiveTime,
153 end: NaiveTime,
155 days: Option<Vec<Weekday>>,
157 },
158 Expression {
160 expr: String,
162 },
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum CompareOp {
169 Equals,
170 NotEquals,
171 GreaterThan,
172 GreaterOrEqual,
173 LessThan,
174 LessOrEqual,
175 Contains,
176 StartsWith,
177 EndsWith,
178 In,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
187#[serde(tag = "type", rename_all = "snake_case")]
188pub enum Action {
189 Export {
191 filter: SessionFilter,
193 format: String,
195 output: String,
197 },
198 Archive {
200 filter: SessionFilter,
202 destination: String,
204 },
205 Delete {
207 filter: SessionFilter,
209 },
210 Tag {
212 filter: SessionFilter,
214 add_tags: Vec<String>,
216 remove_tags: Vec<String>,
218 },
219 Sync {
221 provider: String,
223 direction: SyncDirection,
225 },
226 Harvest {
228 provider: Option<String>,
230 },
231 Plugin {
233 plugin_id: String,
235 action: String,
237 params: HashMap<String, serde_json::Value>,
239 },
240 Notify {
242 channel: NotificationChannel,
244 message: String,
246 title: Option<String>,
248 },
249 Http {
251 url: String,
253 method: String,
255 headers: HashMap<String, String>,
257 body: Option<String>,
259 },
260 Shell {
262 command: String,
264 cwd: Option<String>,
266 env: HashMap<String, String>,
268 },
269 SetVariable {
271 name: String,
273 value: serde_json::Value,
275 },
276 If {
278 condition: Condition,
280 then: Vec<Action>,
282 else_: Option<Vec<Action>>,
284 },
285 ForEach {
287 items: String,
289 as_var: String,
291 actions: Vec<Action>,
293 },
294 Delay {
296 seconds: u64,
298 },
299 Log {
301 level: LogLevel,
303 message: String,
305 },
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SessionFilter {
311 pub provider: Option<String>,
313 pub older_than_days: Option<u32>,
315 pub tags: Option<Vec<String>>,
317 pub query: Option<String>,
319}
320
321#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
323#[serde(rename_all = "snake_case")]
324pub enum SyncDirection {
325 Push,
326 Pull,
327 Both,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
332#[serde(tag = "type", rename_all = "snake_case")]
333pub enum NotificationChannel {
334 System,
336 Email { to: String },
338 Slack { webhook_url: String },
340 Discord { webhook_url: String },
342 Webhook { url: String },
344}
345
346#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
348#[serde(rename_all = "snake_case")]
349pub enum LogLevel {
350 Debug,
351 Info,
352 Warning,
353 Error,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize)]
358#[serde(rename_all = "snake_case")]
359pub enum ErrorStrategy {
360 Stop,
362 Continue,
364 Retry {
366 max_attempts: u32,
367 delay_seconds: u64,
368 },
369 Fallback { actions: Vec<Action> },
371}
372
373#[derive(Debug, Clone)]
379pub struct ExecutionContext {
380 pub workflow_id: String,
382 pub run_id: String,
384 pub trigger_event: Option<serde_json::Value>,
386 pub variables: HashMap<String, serde_json::Value>,
388 pub started_at: DateTime<Utc>,
390 pub results: Vec<ActionResult>,
392}
393
394impl ExecutionContext {
395 pub fn new(workflow_id: String, trigger_event: Option<serde_json::Value>) -> Self {
396 Self {
397 workflow_id,
398 run_id: uuid::Uuid::new_v4().to_string(),
399 trigger_event,
400 variables: HashMap::new(),
401 started_at: Utc::now(),
402 results: Vec::new(),
403 }
404 }
405
406 pub fn get_var(&self, name: &str) -> Option<&serde_json::Value> {
408 self.variables.get(name)
409 }
410
411 pub fn set_var(&mut self, name: String, value: serde_json::Value) {
413 self.variables.insert(name, value);
414 }
415
416 pub fn interpolate(&self, template: &str) -> String {
418 let mut result = template.to_string();
419
420 for (key, value) in &self.variables {
421 let placeholder = format!("{{{{{}}}}}", key);
422 let replacement = match value {
423 serde_json::Value::String(s) => s.clone(),
424 other => other.to_string(),
425 };
426 result = result.replace(&placeholder, &replacement);
427 }
428
429 result
430 }
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize)]
435pub struct ActionResult {
436 pub action_index: usize,
438 pub success: bool,
440 pub data: Option<serde_json::Value>,
442 pub error: Option<String>,
444 pub duration_ms: u64,
446 pub executed_at: DateTime<Utc>,
448}
449
450#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
456#[serde(rename_all = "snake_case")]
457pub enum RunStatus {
458 Running,
459 Completed,
460 Failed,
461 Cancelled,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct WorkflowRun {
467 pub id: String,
469 pub workflow_id: String,
471 pub status: RunStatus,
473 pub trigger: String,
475 pub started_at: DateTime<Utc>,
477 pub ended_at: Option<DateTime<Utc>>,
479 pub results: Vec<ActionResult>,
481 pub error: Option<String>,
483}
484
485pub struct AutomationEngine {
487 workflows: Arc<RwLock<HashMap<String, Workflow>>>,
489 runs: Arc<RwLock<Vec<WorkflowRun>>>,
491 max_history: usize,
493}
494
495impl AutomationEngine {
496 pub fn new(max_history: usize) -> Self {
497 Self {
498 workflows: Arc::new(RwLock::new(HashMap::new())),
499 runs: Arc::new(RwLock::new(Vec::new())),
500 max_history,
501 }
502 }
503
504 pub async fn register(&self, workflow: Workflow) -> Result<()> {
506 self.validate_workflow(&workflow)?;
507 self.workflows
508 .write()
509 .await
510 .insert(workflow.id.clone(), workflow);
511 Ok(())
512 }
513
514 fn validate_workflow(&self, workflow: &Workflow) -> Result<()> {
516 if workflow.id.is_empty() {
517 return Err(anyhow!("Workflow ID cannot be empty"));
518 }
519 if workflow.triggers.is_empty() {
520 return Err(anyhow!("Workflow must have at least one trigger"));
521 }
522 if workflow.actions.is_empty() {
523 return Err(anyhow!("Workflow must have at least one action"));
524 }
525 Ok(())
526 }
527
528 pub async fn unregister(&self, workflow_id: &str) -> Result<()> {
530 self.workflows
531 .write()
532 .await
533 .remove(workflow_id)
534 .ok_or_else(|| anyhow!("Workflow not found: {}", workflow_id))?;
535 Ok(())
536 }
537
538 pub async fn get_workflow(&self, workflow_id: &str) -> Option<Workflow> {
540 self.workflows.read().await.get(workflow_id).cloned()
541 }
542
543 pub async fn list_workflows(&self) -> Vec<Workflow> {
545 self.workflows.read().await.values().cloned().collect()
546 }
547
548 pub async fn trigger(
550 &self,
551 workflow_id: &str,
552 event: Option<serde_json::Value>,
553 ) -> Result<String> {
554 let workflow = self
555 .workflows
556 .read()
557 .await
558 .get(workflow_id)
559 .cloned()
560 .ok_or_else(|| anyhow!("Workflow not found: {}", workflow_id))?;
561
562 if !workflow.enabled {
563 return Err(anyhow!("Workflow is disabled"));
564 }
565
566 self.execute_workflow(&workflow, event).await
567 }
568
569 async fn execute_workflow(
571 &self,
572 workflow: &Workflow,
573 event: Option<serde_json::Value>,
574 ) -> Result<String> {
575 let mut ctx = ExecutionContext::new(workflow.id.clone(), event);
576
577 for condition in &workflow.conditions {
579 if !self.evaluate_condition(condition, &ctx).await? {
580 return Err(anyhow!("Workflow conditions not met"));
581 }
582 }
583
584 let run = WorkflowRun {
585 id: ctx.run_id.clone(),
586 workflow_id: workflow.id.clone(),
587 status: RunStatus::Running,
588 trigger: "manual".to_string(),
589 started_at: ctx.started_at,
590 ended_at: None,
591 results: Vec::new(),
592 error: None,
593 };
594
595 self.record_run(run.clone()).await;
596
597 let mut final_status = RunStatus::Completed;
599 let mut final_error = None;
600
601 for (i, action) in workflow.actions.iter().enumerate() {
602 let start = std::time::Instant::now();
603
604 match self.execute_action(action, &mut ctx).await {
605 Ok(data) => {
606 ctx.results.push(ActionResult {
607 action_index: i,
608 success: true,
609 data,
610 error: None,
611 duration_ms: start.elapsed().as_millis() as u64,
612 executed_at: Utc::now(),
613 });
614 }
615 Err(e) => {
616 ctx.results.push(ActionResult {
617 action_index: i,
618 success: false,
619 data: None,
620 error: Some(e.to_string()),
621 duration_ms: start.elapsed().as_millis() as u64,
622 executed_at: Utc::now(),
623 });
624
625 match &workflow.on_error {
626 ErrorStrategy::Stop => {
627 final_status = RunStatus::Failed;
628 final_error = Some(e.to_string());
629 break;
630 }
631 ErrorStrategy::Continue => continue,
632 ErrorStrategy::Retry {
633 max_attempts,
634 delay_seconds,
635 } => {
636 let mut retry_success = false;
638 for _ in 0..*max_attempts {
639 tokio::time::sleep(tokio::time::Duration::from_secs(
640 *delay_seconds,
641 ))
642 .await;
643 if self.execute_action(action, &mut ctx).await.is_ok() {
644 retry_success = true;
645 break;
646 }
647 }
648 if !retry_success {
649 final_status = RunStatus::Failed;
650 final_error = Some(e.to_string());
651 break;
652 }
653 }
654 ErrorStrategy::Fallback { actions } => {
655 for fallback in actions {
656 self.execute_action(fallback, &mut ctx).await.ok();
657 }
658 }
659 }
660 }
661 }
662 }
663
664 self.update_run(&ctx.run_id, final_status, final_error, ctx.results)
666 .await;
667
668 self.update_workflow_stats(&workflow.id).await;
670
671 Ok(ctx.run_id)
672 }
673
674 fn evaluate_condition<'a>(
676 &'a self,
677 condition: &'a Condition,
678 ctx: &'a ExecutionContext,
679 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
680 Box::pin(async move {
681 match condition {
682 Condition::And { conditions } => {
683 for c in conditions {
684 if !self.evaluate_condition(c, ctx).await? {
685 return Ok(false);
686 }
687 }
688 Ok(true)
689 }
690 Condition::Or { conditions } => {
691 for c in conditions {
692 if self.evaluate_condition(c, ctx).await? {
693 return Ok(true);
694 }
695 }
696 Ok(false)
697 }
698 Condition::Not { condition } => {
699 Ok(!self.evaluate_condition(condition, ctx).await?)
700 }
701 Condition::Compare {
702 left,
703 operator,
704 right,
705 } => {
706 let left_val = ctx.interpolate(left);
707 self.compare_values(&left_val, operator, right)
708 }
709 Condition::Exists { path } => Ok(ctx.variables.contains_key(path)),
710 Condition::Matches { value, pattern } => {
711 let val = ctx.interpolate(value);
712 let re = regex::Regex::new(pattern)?;
713 Ok(re.is_match(&val))
714 }
715 Condition::TimeWindow { start, end, days } => {
716 let now = Utc::now();
717 let current_time = now.time();
718 let current_day = now.weekday();
719
720 if let Some(valid_days) = days {
722 if !valid_days.contains(¤t_day) {
723 return Ok(false);
724 }
725 }
726
727 if start <= end {
729 Ok(current_time >= *start && current_time <= *end)
730 } else {
731 Ok(current_time >= *start || current_time <= *end)
733 }
734 }
735 Condition::Expression { expr: _ } => {
736 Ok(true)
738 }
739 }
740 })
741 }
742
743 fn compare_values(
744 &self,
745 left: &str,
746 op: &CompareOp,
747 right: &serde_json::Value,
748 ) -> Result<bool> {
749 match op {
750 CompareOp::Equals => {
751 if let serde_json::Value::String(s) = right {
752 Ok(left == s)
753 } else {
754 Ok(left == right.to_string())
755 }
756 }
757 CompareOp::NotEquals => {
758 if let serde_json::Value::String(s) = right {
759 Ok(left != s)
760 } else {
761 Ok(left != right.to_string())
762 }
763 }
764 CompareOp::Contains => {
765 if let serde_json::Value::String(s) = right {
766 Ok(left.contains(s.as_str()))
767 } else {
768 Ok(false)
769 }
770 }
771 CompareOp::StartsWith => {
772 if let serde_json::Value::String(s) = right {
773 Ok(left.starts_with(s.as_str()))
774 } else {
775 Ok(false)
776 }
777 }
778 CompareOp::EndsWith => {
779 if let serde_json::Value::String(s) = right {
780 Ok(left.ends_with(s.as_str()))
781 } else {
782 Ok(false)
783 }
784 }
785 CompareOp::In => {
786 if let serde_json::Value::Array(arr) = right {
787 for item in arr {
788 if let serde_json::Value::String(s) = item {
789 if left == s {
790 return Ok(true);
791 }
792 }
793 }
794 }
795 Ok(false)
796 }
797 _ => {
798 let left_num: f64 = left.parse()?;
800 let right_num = match right {
801 serde_json::Value::Number(n) => n.as_f64().unwrap_or(0.0),
802 _ => right.to_string().parse()?,
803 };
804
805 Ok(match op {
806 CompareOp::GreaterThan => left_num > right_num,
807 CompareOp::GreaterOrEqual => left_num >= right_num,
808 CompareOp::LessThan => left_num < right_num,
809 CompareOp::LessOrEqual => left_num <= right_num,
810 _ => false,
811 })
812 }
813 }
814 }
815
816 fn execute_action<'a>(
818 &'a self,
819 action: &'a Action,
820 ctx: &'a mut ExecutionContext,
821 ) -> std::pin::Pin<
822 Box<dyn std::future::Future<Output = Result<Option<serde_json::Value>>> + Send + 'a>,
823 > {
824 Box::pin(async move {
825 match action {
826 Action::Export {
827 filter,
828 format,
829 output,
830 } => {
831 log::info!(
833 "Exporting sessions: filter={:?}, format={}, output={}",
834 filter,
835 format,
836 output
837 );
838 Ok(Some(serde_json::json!({ "exported": true })))
839 }
840 Action::Archive {
841 filter,
842 destination,
843 } => {
844 log::info!(
845 "Archiving sessions: filter={:?}, destination={}",
846 filter,
847 destination
848 );
849 Ok(Some(serde_json::json!({ "archived": true })))
850 }
851 Action::Delete { filter } => {
852 log::info!("Deleting sessions: filter={:?}", filter);
853 Ok(Some(serde_json::json!({ "deleted": true })))
854 }
855 Action::Tag {
856 filter,
857 add_tags,
858 remove_tags,
859 } => {
860 log::info!(
861 "Tagging sessions: filter={:?}, add={:?}, remove={:?}",
862 filter,
863 add_tags,
864 remove_tags
865 );
866 Ok(Some(serde_json::json!({ "tagged": true })))
867 }
868 Action::Sync {
869 provider,
870 direction,
871 } => {
872 log::info!(
873 "Syncing with provider {}: direction={:?}",
874 provider,
875 direction
876 );
877 Ok(Some(serde_json::json!({ "synced": true })))
878 }
879 Action::Harvest { provider } => {
880 log::info!("Harvesting from provider: {:?}", provider);
881 Ok(Some(serde_json::json!({ "harvested": true })))
882 }
883 Action::Plugin {
884 plugin_id,
885 action,
886 params,
887 } => {
888 log::info!(
889 "Executing plugin {}: action={}, params={:?}",
890 plugin_id,
891 action,
892 params
893 );
894 Ok(Some(serde_json::json!({ "plugin_executed": true })))
895 }
896 Action::Notify {
897 channel,
898 message,
899 title,
900 } => {
901 let msg = ctx.interpolate(message);
902 log::info!(
903 "Sending notification: channel={:?}, title={:?}, message={}",
904 channel,
905 title,
906 msg
907 );
908 Ok(Some(serde_json::json!({ "notified": true })))
909 }
910 Action::Http {
911 url,
912 method,
913 headers: _,
914 body: _,
915 } => {
916 let url = ctx.interpolate(url);
917 log::info!("HTTP request: {} {}", method, url);
918 Ok(Some(serde_json::json!({ "status": 200 })))
920 }
921 Action::Shell { command, cwd, env: _ } => {
922 let cmd = ctx.interpolate(command);
923 log::info!("Executing shell: {} (cwd={:?})", cmd, cwd);
924 Ok(Some(serde_json::json!({ "exit_code": 0 })))
926 }
927 Action::SetVariable { name, value } => {
928 ctx.set_var(name.clone(), value.clone());
929 Ok(None)
930 }
931 Action::If {
932 condition,
933 then,
934 else_,
935 } => {
936 if self.evaluate_condition(condition, ctx).await? {
937 for action in then {
938 self.execute_action(action, ctx).await?;
939 }
940 } else if let Some(else_actions) = else_ {
941 for action in else_actions {
942 self.execute_action(action, ctx).await?;
943 }
944 }
945 Ok(None)
946 }
947 Action::ForEach {
948 items,
949 as_var,
950 actions,
951 } => {
952 if let Some(arr) = ctx.get_var(items) {
953 if let serde_json::Value::Array(items_arr) = arr.clone() {
954 for item in items_arr {
955 ctx.set_var(as_var.clone(), item);
956 for action in actions {
957 self.execute_action(action, ctx).await?;
958 }
959 }
960 }
961 }
962 Ok(None)
963 }
964 Action::Delay { seconds } => {
965 tokio::time::sleep(tokio::time::Duration::from_secs(*seconds)).await;
966 Ok(None)
967 }
968 Action::Log { level, message } => {
969 let msg = ctx.interpolate(message);
970 match level {
971 LogLevel::Debug => log::debug!("{}", msg),
972 LogLevel::Info => log::info!("{}", msg),
973 LogLevel::Warning => log::warn!("{}", msg),
974 LogLevel::Error => log::error!("{}", msg),
975 }
976 Ok(None)
977 }
978 }
979 })
980 }
981
982 async fn record_run(&self, run: WorkflowRun) {
983 let mut runs = self.runs.write().await;
984 runs.push(run);
985
986 if runs.len() > self.max_history {
988 runs.remove(0);
989 }
990 }
991
992 async fn update_run(
993 &self,
994 run_id: &str,
995 status: RunStatus,
996 error: Option<String>,
997 results: Vec<ActionResult>,
998 ) {
999 let mut runs = self.runs.write().await;
1000 if let Some(run) = runs.iter_mut().find(|r| r.id == run_id) {
1001 run.status = status;
1002 run.ended_at = Some(Utc::now());
1003 run.error = error;
1004 run.results = results;
1005 }
1006 }
1007
1008 async fn update_workflow_stats(&self, workflow_id: &str) {
1009 let mut workflows = self.workflows.write().await;
1010 if let Some(workflow) = workflows.get_mut(workflow_id) {
1011 workflow.last_run = Some(Utc::now());
1012 workflow.run_count += 1;
1013 }
1014 }
1015
1016 pub async fn get_runs(&self, workflow_id: Option<&str>, limit: usize) -> Vec<WorkflowRun> {
1018 let runs = self.runs.read().await;
1019 runs.iter()
1020 .filter(|r| workflow_id.map(|id| r.workflow_id == id).unwrap_or(true))
1021 .rev()
1022 .take(limit)
1023 .cloned()
1024 .collect()
1025 }
1026
1027 pub async fn get_run(&self, run_id: &str) -> Option<WorkflowRun> {
1029 self.runs
1030 .read()
1031 .await
1032 .iter()
1033 .find(|r| r.id == run_id)
1034 .cloned()
1035 }
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040 use super::*;
1041
1042 #[tokio::test]
1043 async fn test_automation_engine() {
1044 let engine = AutomationEngine::new(100);
1045
1046 let workflow = Workflow {
1047 id: "test-workflow".to_string(),
1048 name: "Test Workflow".to_string(),
1049 description: None,
1050 enabled: true,
1051 triggers: vec![Trigger::Manual],
1052 conditions: vec![],
1053 actions: vec![Action::Log {
1054 level: LogLevel::Info,
1055 message: "Test action executed".to_string(),
1056 }],
1057 on_error: ErrorStrategy::Stop,
1058 created_at: Utc::now(),
1059 updated_at: Utc::now(),
1060 last_run: None,
1061 run_count: 0,
1062 };
1063
1064 engine.register(workflow).await.unwrap();
1065 let run_id = engine.trigger("test-workflow", None).await.unwrap();
1066
1067 let run = engine.get_run(&run_id).await.unwrap();
1068 assert_eq!(run.status, RunStatus::Completed);
1069 }
1070
1071 #[test]
1072 fn test_interpolation() {
1073 let mut ctx = ExecutionContext::new("test".to_string(), None);
1074 ctx.set_var("name".to_string(), serde_json::json!("World"));
1075
1076 let result = ctx.interpolate("Hello, {{name}}!");
1077 assert_eq!(result, "Hello, World!");
1078 }
1079}