greentic_runner_host/runner/
adapt_messaging.rs

1use anyhow::{Result, bail};
2use axum::extract::Json;
3use axum::http::StatusCode;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8use crate::engine::runtime::IngressEnvelope;
9use crate::ingress::{
10    ProviderIds, build_canonical_payload, canonical_session_key, default_metadata, empty_entities,
11};
12use crate::provider_core_only;
13use crate::routing::TenantRuntimeHandle;
14use crate::runtime::TenantRuntime;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct TelegramUpdate {
18    update_id: i64,
19    #[serde(default)]
20    message: Option<TelegramMessage>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub(crate) struct TelegramMessage {
25    #[serde(default)]
26    text: Option<String>,
27    chat: TelegramChat,
28    #[serde(default)]
29    from: Option<TelegramUser>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub(crate) struct TelegramChat {
34    id: i64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub(crate) struct TelegramUser {
39    id: i64,
40}
41
42pub async fn telegram_webhook(
43    TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
44    Json(update): Json<TelegramUpdate>,
45) -> StatusCode {
46    if provider_core_only::is_enabled() {
47        tracing::warn!(
48            update_id = update.update_id,
49            "provider-core only mode enabled; blocking telegram webhook"
50        );
51        return StatusCode::NOT_IMPLEMENTED;
52    }
53
54    if let Some(status) = {
55        let mut cache = runtime.telegram_cache().lock();
56        cache.get(&update.update_id).copied()
57    } {
58        tracing::debug!(
59            update_id = update.update_id,
60            status = %status,
61            "duplicate telegram update skipped"
62        );
63        return status;
64    }
65
66    let message = match update.message.as_ref() {
67        Some(msg) => msg,
68        None => {
69            tracing::debug!(update_id = update.update_id, "no message payload in update");
70            return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
71        }
72    };
73
74    let text = match message.text.as_ref() {
75        Some(text) if !text.trim().is_empty() => text.clone(),
76        _ => {
77            tracing::debug!(update_id = update.update_id, "ignoring non-text message");
78            return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
79        }
80    };
81
82    let engine = runtime.engine();
83    let flow = match engine.flow_by_type("messaging") {
84        Some(flow) => flow,
85        None => {
86            tracing::error!("no messaging flow registered in pack");
87            return StatusCode::NOT_FOUND;
88        }
89    };
90
91    let raw_value = serde_json::to_value(&update).unwrap_or(Value::Null);
92    let mapped = map_telegram_activity(&tenant, message, &text, update.update_id, raw_value);
93
94    let envelope = IngressEnvelope {
95        tenant: tenant.clone(),
96        env: None,
97        flow_id: flow.id.clone(),
98        flow_type: Some(flow.flow_type.clone()),
99        action: Some("messaging".into()),
100        session_hint: Some(mapped.session_key.clone()),
101        provider: Some("telegram".into()),
102        channel: mapped.channel.clone(),
103        conversation: mapped.conversation.clone(),
104        user: mapped.user.clone(),
105        activity_id: mapped.provider_ids.message_id.clone(),
106        timestamp: Some(mapped.timestamp.to_rfc3339()),
107        payload: mapped.payload,
108        metadata: None,
109    }
110    .canonicalize();
111
112    match runtime.state_machine().handle(envelope).await {
113        Ok(response) => {
114            let replies = collect_text_responses(&response);
115            if replies.is_empty() {
116                tracing::info!(
117                    flow_id = %flow.id,
118                    update_id = update.update_id,
119                    "flow completed without telegram replies"
120                );
121                return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
122            }
123
124            for text in &replies {
125                if let Err(err) =
126                    send_telegram_message(runtime.as_ref(), message.chat.id, text).await
127                {
128                    tracing::error!(
129                        flow_id = %flow.id,
130                        update_id = update.update_id,
131                        error = %err,
132                        "failed to send telegram message"
133                    );
134                    return remember_status(
135                        runtime.as_ref(),
136                        update.update_id,
137                        StatusCode::BAD_GATEWAY,
138                    );
139                }
140            }
141
142            tracing::info!(
143                flow_id = %flow.id,
144                update_id = update.update_id,
145                replies = replies.len(),
146                "flow completed"
147            );
148            remember_status(runtime.as_ref(), update.update_id, StatusCode::OK)
149        }
150        Err(err) => {
151            let chained = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
152            tracing::error!(
153                flow_id = %flow.id,
154                update_id = update.update_id,
155                error.cause_chain = ?chained,
156                "flow execution failed"
157            );
158            remember_status(
159                runtime.as_ref(),
160                update.update_id,
161                StatusCode::INTERNAL_SERVER_ERROR,
162            )
163        }
164    }
165}
166
167async fn send_telegram_message(runtime: &TenantRuntime, chat_id: i64, text: &str) -> Result<()> {
168    if !runtime.messaging_rate().lock().try_acquire() {
169        bail!("messaging send rate exceeded");
170    }
171
172    let token = runtime.get_secret("TELEGRAM_BOT_TOKEN")?;
173    let url = format!("https://api.telegram.org/bot{token}/sendMessage");
174    let body = json!({
175        "chat_id": chat_id,
176        "text": text,
177        "parse_mode": "MarkdownV2",
178    });
179    runtime
180        .http_client()
181        .post(url)
182        .json(&body)
183        .send()
184        .await?
185        .error_for_status()?;
186    Ok(())
187}
188
189fn remember_status(runtime: &TenantRuntime, update_id: i64, status: StatusCode) -> StatusCode {
190    let mut cache = runtime.telegram_cache().lock();
191    cache.put(update_id, status);
192    status
193}
194
195fn collect_text_responses(value: &serde_json::Value) -> Vec<String> {
196    match value {
197        serde_json::Value::Null => Vec::new(),
198        serde_json::Value::String(text) => vec![text.to_owned()],
199        serde_json::Value::Array(items) => {
200            let mut replies = Vec::new();
201            for item in items {
202                replies.extend(collect_text_responses(item));
203            }
204            replies
205        }
206        serde_json::Value::Object(map) => {
207            if let Some(messages) = map.get("messages").and_then(|v| v.as_array()) {
208                let mut replies = Vec::new();
209                for entry in messages {
210                    replies.extend(collect_text_responses(entry));
211                }
212                return replies;
213            }
214            map.get("text")
215                .and_then(|v| v.as_str())
216                .map(|text| vec![text.to_owned()])
217                .unwrap_or_default()
218        }
219        _ => Vec::new(),
220    }
221}
222
223struct MappedTelegram {
224    provider_ids: ProviderIds,
225    session_key: String,
226    timestamp: DateTime<Utc>,
227    payload: Value,
228    channel: Option<String>,
229    conversation: Option<String>,
230    user: Option<String>,
231}
232
233fn map_telegram_activity(
234    tenant: &str,
235    message: &TelegramMessage,
236    text: &str,
237    update_id: i64,
238    raw: Value,
239) -> MappedTelegram {
240    let chat_id = message.chat.id.to_string();
241    let user = message.from.as_ref().map(|user| user.id.to_string());
242    let provider_ids = ProviderIds {
243        channel_id: Some(chat_id.clone()),
244        conversation_id: Some(chat_id.clone()),
245        user_id: user.clone(),
246        message_id: Some(update_id.to_string()),
247        event_id: Some(update_id.to_string()),
248        ..ProviderIds::default()
249    };
250    let timestamp = Utc::now();
251    let session_key = canonical_session_key(tenant, "telegram", &provider_ids);
252    let payload = build_canonical_payload(
253        tenant,
254        "telegram",
255        &provider_ids,
256        session_key.clone(),
257        &["chat".into()],
258        timestamp,
259        None,
260        Some(text.to_string()),
261        Vec::new(),
262        Vec::new(),
263        empty_entities(),
264        default_metadata(),
265        json!({ "chat_id": message.chat.id }),
266        raw,
267    );
268
269    MappedTelegram {
270        provider_ids,
271        session_key,
272        timestamp,
273        payload,
274        channel: Some(chat_id.clone()),
275        conversation: Some(chat_id),
276        user,
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use serde_json::json;
284
285    #[test]
286    fn collect_text_from_array_and_objects() {
287        let payload = json!([
288            { "text": "hello" },
289            { "messages": [{ "text": "nested" }, "raw"] },
290            null,
291            "world"
292        ]);
293        let replies = collect_text_responses(&payload);
294        assert_eq!(replies, vec!["hello", "nested", "raw", "world"]);
295    }
296
297    #[test]
298    fn collect_text_from_single_object() {
299        let payload = json!({ "text": "only" });
300        let replies = collect_text_responses(&payload);
301        assert_eq!(replies, vec!["only"]);
302    }
303
304    #[test]
305    fn telegram_activity_maps_to_canonical_payload() {
306        let update = TelegramUpdate {
307            update_id: 42,
308            message: Some(TelegramMessage {
309                text: Some("Hello".into()),
310                chat: TelegramChat { id: 123 },
311                from: Some(TelegramUser { id: 777 }),
312            }),
313        };
314        let message = update.message.clone().unwrap();
315        let raw = serde_json::to_value(&update).unwrap();
316        let mapped = map_telegram_activity("demo", &message, "Hello", update.update_id, raw);
317        assert_eq!(mapped.session_key, "demo:telegram:123:777");
318        assert_eq!(mapped.provider_ids.conversation_id.as_deref(), Some("123"));
319        assert_eq!(mapped.provider_ids.user_id.as_deref(), Some("777"));
320        assert_eq!(mapped.payload["provider"], json!("telegram"));
321        assert_eq!(mapped.payload["text"], json!("Hello"));
322    }
323}