Skip to main content

aagt_core/infra/
notifications.rs

1//! Notification steps for AAGT pipelines.
2//!
3//! This module provides ready-to-use pipeline steps for sending notifications
4//! via Telegram, Discord, and Email (via webhook/API).
5
6use anyhow::Result;
7#[cfg(feature = "trading")]
8use crate::trading::pipeline::{Context, Step};
9use async_trait::async_trait;
10use serde_json::json;
11use std::fmt::Debug;
12
13// --- Telegram Notification ---
14
15/// A step that sends a message to a Telegram chat using a bot token.
16#[derive(Debug)]
17pub struct TelegramStep {
18    bot_token: String,
19    chat_id: String,
20    message_template: String, // Simple template string
21}
22
23impl TelegramStep {
24    /// Create a new Telegram notification step
25    /// 
26    /// `message_template` can contain placeholders like `{key}` which will be replaced
27    /// by values from `context.data` if they exist and are strings/numbers.
28    pub fn new(bot_token: impl Into<String>, chat_id: impl Into<String>, message_template: impl Into<String>) -> Self {
29        Self {
30            bot_token: bot_token.into(),
31            chat_id: chat_id.into(),
32            message_template: message_template.into(),
33        }
34    }
35
36    #[cfg(feature = "trading")]
37    fn format_message(&self, ctx: &Context) -> String {
38        let mut msg = self.message_template.clone();
39        // Simple interpolation: replace {key} with value from ctx.data
40        for (key, value) in &ctx.data {
41             let placeholder = format!("{{{}}}", key);
42             if msg.contains(&placeholder) {
43                 if let Some(s) = value.as_str() {
44                     msg = msg.replace(&placeholder, s);
45                 } else {
46                     msg = msg.replace(&placeholder, &value.to_string());
47                 }
48             }
49        }
50        // Also replace {input} and {outcome}
51        msg = msg.replace("{input}", &ctx.input);
52        if let Some(outcome) = &ctx.outcome {
53             msg = msg.replace("{outcome}", outcome);
54        }
55        msg
56    }
57}
58
59#[cfg(feature = "trading")]
60#[async_trait]
61impl Step for TelegramStep {
62    async fn execute(&self, ctx: &mut Context) -> Result<()> {
63        let text = self.format_message(ctx);
64        let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token);
65        
66        let client = reqwest::Client::new();
67        let _res = client.post(&url)
68            .json(&json!({
69                "chat_id": self.chat_id,
70                "text": text,
71                "parse_mode": "Markdown"
72            }))
73            .send()
74            .await?;
75            
76        ctx.log(format!("Sent Telegram notification to {}", self.chat_id));
77        Ok(())
78    }
79
80    fn name(&self) -> &str {
81        "TelegramNotification"
82    }
83}
84
85// --- Discord Notification ---
86
87/// A step that sends a message to a Discord channel via Webhook.
88#[derive(Debug)]
89pub struct DiscordStep {
90    webhook_url: String,
91    username: Option<String>,
92    avatar_url: Option<String>,
93    message_template: String,
94}
95
96impl DiscordStep {
97    pub fn new(webhook_url: impl Into<String>, message_template: impl Into<String>) -> Self {
98        Self {
99            webhook_url: webhook_url.into(),
100            message_template: message_template.into(),
101            username: None,
102            avatar_url: None,
103        }
104    }
105
106    pub fn username(mut self, name: impl Into<String>) -> Self {
107        self.username = Some(name.into());
108        self
109    }
110    
111    #[cfg(feature = "trading")]
112    fn format_message(&self, ctx: &Context) -> String {
113        // Reuse logic or abstract it later. For now, duplication is fine for simplicity.
114        let mut msg = self.message_template.clone();
115        for (key, value) in &ctx.data {
116             let placeholder = format!("{{{}}}", key);
117             if msg.contains(&placeholder) {
118                 if let Some(s) = value.as_str() {
119                     msg = msg.replace(&placeholder, s);
120                 } else {
121                     msg = msg.replace(&placeholder, &value.to_string());
122                 }
123             }
124        }
125        msg = msg.replace("{input}", &ctx.input);
126        if let Some(outcome) = &ctx.outcome {
127             msg = msg.replace("{outcome}", outcome);
128        }
129        msg
130    }
131}
132
133#[cfg(feature = "trading")]
134#[async_trait]
135impl Step for DiscordStep {
136    async fn execute(&self, ctx: &mut Context) -> Result<()> {
137        let text = self.format_message(ctx);
138        let client = reqwest::Client::new();
139        
140        let mut payload = json!({
141            "content": text
142        });
143
144        if let Some(u) = &self.username {
145            payload["username"] = json!(u);
146        }
147        if let Some(a) = &self.avatar_url {
148            payload["avatar_url"] = json!(a);
149        }
150
151        let _res = client.post(&self.webhook_url)
152            .json(&payload)
153            .send()
154            .await?;
155
156        ctx.log("Sent Discord notification");
157        Ok(())
158    }
159
160    fn name(&self) -> &str {
161        "DiscordNotification"
162    }
163}
164
165// --- Email Notification (Generic Webhook) ---
166
167/// A step that sends an email via a generic HTTP API (like SendGrid/Mailgun).
168/// Since SMTP is heavy, we recommend using HTTP APIs for agents.
169#[derive(Debug)]
170pub struct EmailStep {
171    api_url: String,
172    api_key: String,
173    to: String,
174    subject: String,
175    provider: EmailProvider,
176}
177
178#[derive(Debug)]
179pub enum EmailProvider {
180    Mailgun { domain: String },
181    SendGrid,
182    CustomWebhook, // Assumes a generic POST {to, subject, body}
183}
184
185impl EmailStep {
186    pub fn new_mailgun(api_key: &str, domain: &str, to: &str, subject: &str) -> Self {
187        Self {
188            api_url: format!("https://api.mailgun.net/v3/{}/messages", domain),
189            api_key: api_key.to_string(),
190            to: to.to_string(),
191            subject: subject.to_string(),
192            provider: EmailProvider::Mailgun { domain: domain.to_string() },
193        }
194    }
195
196    pub fn new_sendgrid(api_key: &str, to: &str, subject: &str) -> Self {
197         Self {
198            api_url: "https://api.sendgrid.com/v3/mail/send".to_string(),
199            api_key: api_key.to_string(),
200            to: to.to_string(),
201            subject: subject.to_string(),
202            provider: EmailProvider::SendGrid,
203        }
204    }
205}
206
207#[cfg(feature = "trading")]
208#[async_trait]
209impl Step for EmailStep {
210    async fn execute(&self, ctx: &mut Context) -> Result<()> {
211        let body = format!("Pipeline Report:\n\nInput: {}\nOutcome: {:?}\n\nData: {:?}", 
212            ctx.input, ctx.outcome, ctx.data);
213            
214        let client = reqwest::Client::new();
215        
216        match &self.provider {
217            EmailProvider::Mailgun { .. } => {
218                client.post(&self.api_url)
219                    .basic_auth("api", Some(&self.api_key))
220                    .form(&[
221                        ("from", "AAGT Agent <agent@aagt.dev>"),
222                        ("to", &self.to),
223                        ("subject", &self.subject),
224                        ("text", &body)
225                    ])
226                    .send()
227                    .await?;
228            },
229            EmailProvider::SendGrid => {
230                let payload = json!({
231                    "personalizations": [{"to": [{"email": self.to}]}],
232                    "from": {"email": "agent@aagt.dev"},
233                    "subject": self.subject,
234                    "content": [{"type": "text/plain", "value": body}]
235                });
236                
237                client.post(&self.api_url)
238                    .header("Authorization", format!("Bearer {}", self.api_key))
239                    .json(&payload)
240                    .send()
241                    .await?;
242            },
243            _ => ctx.log("Custom email provider not implemented yet"),
244        }
245
246        ctx.log(format!("Sent Email notification to {}", self.to));
247        Ok(())
248    }
249
250    fn name(&self) -> &str {
251        "EmailNotification"
252    }
253}