Skip to main content

greentic_runner_host/runner/
adapt_messaging.rs

1use anyhow::{Result, bail};
2use axum::extract::Json;
3use axum::http::StatusCode;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8use crate::engine::runtime::IngressEnvelope;
9use crate::ingress::{
10    ProviderIds, build_canonical_payload, canonical_session_key, default_metadata, empty_entities,
11};
12use crate::provider_core_only;
13use crate::routing::TenantRuntimeHandle;
14use crate::runtime::TenantRuntime;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct TelegramUpdate {
18    update_id: i64,
19    #[serde(default)]
20    message: Option<TelegramMessage>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub(crate) struct TelegramMessage {
25    #[serde(default)]
26    text: Option<String>,
27    chat: TelegramChat,
28    #[serde(default)]
29    from: Option<TelegramUser>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub(crate) struct TelegramChat {
34    id: i64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub(crate) struct TelegramUser {
39    id: i64,
40}
41
42pub async fn telegram_webhook(
43    TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
44    Json(update): Json<TelegramUpdate>,
45) -> StatusCode {
46    if provider_core_only::is_enabled() {
47        tracing::warn!(
48            update_id = update.update_id,
49            "provider-core only mode enabled; blocking telegram webhook"
50        );
51        return StatusCode::NOT_IMPLEMENTED;
52    }
53
54    if let Some(status) = {
55        let mut cache = runtime.telegram_cache().lock();
56        cache.get(&update.update_id).copied()
57    } {
58        tracing::debug!(
59            update_id = update.update_id,
60            status = %status,
61            "duplicate telegram update skipped"
62        );
63        return status;
64    }
65
66    let message = match update.message.as_ref() {
67        Some(msg) => msg,
68        None => {
69            tracing::debug!(update_id = update.update_id, "no message payload in update");
70            return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
71        }
72    };
73
74    let text = match message.text.as_ref() {
75        Some(text) if !text.trim().is_empty() => text.clone(),
76        _ => {
77            tracing::debug!(update_id = update.update_id, "ignoring non-text message");
78            return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
79        }
80    };
81
82    let engine = runtime.engine();
83    let flow = match engine.flow_by_type("messaging") {
84        Some(flow) => flow,
85        None => {
86            tracing::error!("no messaging flow registered in pack");
87            return StatusCode::NOT_FOUND;
88        }
89    };
90
91    let raw_value = serde_json::to_value(&update).unwrap_or(Value::Null);
92    let mapped = map_telegram_activity(&tenant, message, &text, update.update_id, raw_value);
93
94    let envelope = IngressEnvelope {
95        tenant: tenant.clone(),
96        env: None,
97        pack_id: Some(flow.pack_id.clone()),
98        flow_id: flow.id.clone(),
99        flow_type: Some(flow.flow_type.clone()),
100        action: Some("messaging".into()),
101        session_hint: Some(mapped.session_key.clone()),
102        provider: Some("telegram".into()),
103        channel: mapped.channel.clone(),
104        conversation: mapped.conversation.clone(),
105        user: mapped.user.clone(),
106        activity_id: mapped.provider_ids.message_id.clone(),
107        timestamp: Some(mapped.timestamp.to_rfc3339()),
108        payload: mapped.payload,
109        metadata: None,
110        reply_scope: None,
111    }
112    .canonicalize();
113
114    match runtime.state_machine().handle(envelope).await {
115        Ok(response) => {
116            let replies = collect_text_responses(&response);
117            if replies.is_empty() {
118                tracing::info!(
119                    flow_id = %flow.id,
120                    update_id = update.update_id,
121                    "flow completed without telegram replies"
122                );
123                return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
124            }
125
126            for text in &replies {
127                if let Err(err) =
128                    send_telegram_message(runtime.as_ref(), message.chat.id, text).await
129                {
130                    tracing::error!(
131                        flow_id = %flow.id,
132                        update_id = update.update_id,
133                        error = %err,
134                        "failed to send telegram message"
135                    );
136                    return remember_status(
137                        runtime.as_ref(),
138                        update.update_id,
139                        StatusCode::BAD_GATEWAY,
140                    );
141                }
142            }
143
144            tracing::info!(
145                flow_id = %flow.id,
146                update_id = update.update_id,
147                replies = replies.len(),
148                "flow completed"
149            );
150            remember_status(runtime.as_ref(), update.update_id, StatusCode::OK)
151        }
152        Err(err) => {
153            let chained = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
154            tracing::error!(
155                flow_id = %flow.id,
156                update_id = update.update_id,
157                error.cause_chain = ?chained,
158                "flow execution failed"
159            );
160            remember_status(
161                runtime.as_ref(),
162                update.update_id,
163                StatusCode::INTERNAL_SERVER_ERROR,
164            )
165        }
166    }
167}
168
169async fn send_telegram_message(runtime: &TenantRuntime, chat_id: i64, text: &str) -> Result<()> {
170    if !runtime.messaging_rate().lock().try_acquire() {
171        bail!("messaging send rate exceeded");
172    }
173
174    let token = runtime.get_secret("TELEGRAM_BOT_TOKEN")?;
175    let url = format!("https://api.telegram.org/bot{token}/sendMessage");
176    let body = json!({
177        "chat_id": chat_id,
178        "text": text,
179        "parse_mode": "MarkdownV2",
180    });
181    runtime
182        .http_client()
183        .post(url)
184        .json(&body)
185        .send()
186        .await?
187        .error_for_status()?;
188    Ok(())
189}
190
191fn remember_status(runtime: &TenantRuntime, update_id: i64, status: StatusCode) -> StatusCode {
192    let mut cache = runtime.telegram_cache().lock();
193    cache.put(update_id, status);
194    status
195}
196
197fn collect_text_responses(value: &serde_json::Value) -> Vec<String> {
198    match value {
199        serde_json::Value::Null => Vec::new(),
200        serde_json::Value::String(text) => vec![text.to_owned()],
201        serde_json::Value::Array(items) => {
202            let mut replies = Vec::new();
203            for item in items {
204                replies.extend(collect_text_responses(item));
205            }
206            replies
207        }
208        serde_json::Value::Object(map) => {
209            if let Some(messages) = map.get("messages").and_then(|v| v.as_array()) {
210                let mut replies = Vec::new();
211                for entry in messages {
212                    replies.extend(collect_text_responses(entry));
213                }
214                return replies;
215            }
216            map.get("text")
217                .and_then(|v| v.as_str())
218                .map(|text| vec![text.to_owned()])
219                .unwrap_or_default()
220        }
221        _ => Vec::new(),
222    }
223}
224
225struct MappedTelegram {
226    provider_ids: ProviderIds,
227    session_key: String,
228    timestamp: DateTime<Utc>,
229    payload: Value,
230    channel: Option<String>,
231    conversation: Option<String>,
232    user: Option<String>,
233}
234
235fn map_telegram_activity(
236    tenant: &str,
237    message: &TelegramMessage,
238    text: &str,
239    update_id: i64,
240    raw: Value,
241) -> MappedTelegram {
242    let chat_id = message.chat.id.to_string();
243    let user = message.from.as_ref().map(|user| user.id.to_string());
244    let provider_ids = ProviderIds {
245        channel_id: Some(chat_id.clone()),
246        conversation_id: Some(chat_id.clone()),
247        user_id: user.clone(),
248        message_id: Some(update_id.to_string()),
249        event_id: Some(update_id.to_string()),
250        ..ProviderIds::default()
251    };
252    let timestamp = Utc::now();
253    let session_key = canonical_session_key(tenant, "telegram", &provider_ids);
254    let payload = build_canonical_payload(
255        tenant,
256        "telegram",
257        &provider_ids,
258        session_key.clone(),
259        &["chat".into()],
260        timestamp,
261        None,
262        Some(text.to_string()),
263        Vec::new(),
264        Vec::new(),
265        empty_entities(),
266        default_metadata(),
267        json!({ "chat_id": message.chat.id }),
268        raw,
269    );
270
271    MappedTelegram {
272        provider_ids,
273        session_key,
274        timestamp,
275        payload,
276        channel: Some(chat_id.clone()),
277        conversation: Some(chat_id),
278        user,
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use serde_json::json;
286
287    #[test]
288    fn collect_text_from_array_and_objects() {
289        let payload = json!([
290            { "text": "hello" },
291            { "messages": [{ "text": "nested" }, "raw"] },
292            null,
293            "world"
294        ]);
295        let replies = collect_text_responses(&payload);
296        assert_eq!(replies, vec!["hello", "nested", "raw", "world"]);
297    }
298
299    #[test]
300    fn collect_text_from_single_object() {
301        let payload = json!({ "text": "only" });
302        let replies = collect_text_responses(&payload);
303        assert_eq!(replies, vec!["only"]);
304    }
305
306    #[test]
307    fn telegram_activity_maps_to_canonical_payload() {
308        let update = TelegramUpdate {
309            update_id: 42,
310            message: Some(TelegramMessage {
311                text: Some("Hello".into()),
312                chat: TelegramChat { id: 123 },
313                from: Some(TelegramUser { id: 777 }),
314            }),
315        };
316        let message = update.message.clone().unwrap();
317        let raw = serde_json::to_value(&update).unwrap();
318        let mapped = map_telegram_activity("demo", &message, "Hello", update.update_id, raw);
319        assert_eq!(mapped.session_key, "demo:telegram:123:777");
320        assert_eq!(mapped.provider_ids.conversation_id.as_deref(), Some("123"));
321        assert_eq!(mapped.provider_ids.user_id.as_deref(), Some("777"));
322        assert_eq!(mapped.payload["provider"], json!("telegram"));
323        assert_eq!(mapped.payload["text"], json!("Hello"));
324    }
325}