greentic_runner_host/runner/
adapt_messaging.rs1use anyhow::{Result, bail};
2use axum::extract::Json;
3use axum::http::StatusCode;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8use crate::engine::runtime::IngressEnvelope;
9use crate::ingress::{
10 ProviderIds, build_canonical_payload, canonical_session_key, default_metadata, empty_entities,
11};
12use crate::provider_core_only;
13use crate::routing::TenantRuntimeHandle;
14use crate::runtime::TenantRuntime;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct TelegramUpdate {
18 update_id: i64,
19 #[serde(default)]
20 message: Option<TelegramMessage>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub(crate) struct TelegramMessage {
25 #[serde(default)]
26 text: Option<String>,
27 chat: TelegramChat,
28 #[serde(default)]
29 from: Option<TelegramUser>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub(crate) struct TelegramChat {
34 id: i64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub(crate) struct TelegramUser {
39 id: i64,
40}
41
42pub async fn telegram_webhook(
43 TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
44 Json(update): Json<TelegramUpdate>,
45) -> StatusCode {
46 if provider_core_only::is_enabled() {
47 tracing::warn!(
48 update_id = update.update_id,
49 "provider-core only mode enabled; blocking telegram webhook"
50 );
51 return StatusCode::NOT_IMPLEMENTED;
52 }
53
54 if let Some(status) = {
55 let mut cache = runtime.telegram_cache().lock();
56 cache.get(&update.update_id).copied()
57 } {
58 tracing::debug!(
59 update_id = update.update_id,
60 status = %status,
61 "duplicate telegram update skipped"
62 );
63 return status;
64 }
65
66 let message = match update.message.as_ref() {
67 Some(msg) => msg,
68 None => {
69 tracing::debug!(update_id = update.update_id, "no message payload in update");
70 return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
71 }
72 };
73
74 let text = match message.text.as_ref() {
75 Some(text) if !text.trim().is_empty() => text.clone(),
76 _ => {
77 tracing::debug!(update_id = update.update_id, "ignoring non-text message");
78 return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
79 }
80 };
81
82 let engine = runtime.engine();
83 let flow = match engine.flow_by_type("messaging") {
84 Some(flow) => flow,
85 None => {
86 tracing::error!("no messaging flow registered in pack");
87 return StatusCode::NOT_FOUND;
88 }
89 };
90
91 let raw_value = serde_json::to_value(&update).unwrap_or(Value::Null);
92 let mapped = map_telegram_activity(&tenant, message, &text, update.update_id, raw_value);
93
94 let envelope = IngressEnvelope {
95 tenant: tenant.clone(),
96 env: None,
97 flow_id: flow.id.clone(),
98 flow_type: Some(flow.flow_type.clone()),
99 action: Some("messaging".into()),
100 session_hint: Some(mapped.session_key.clone()),
101 provider: Some("telegram".into()),
102 channel: mapped.channel.clone(),
103 conversation: mapped.conversation.clone(),
104 user: mapped.user.clone(),
105 activity_id: mapped.provider_ids.message_id.clone(),
106 timestamp: Some(mapped.timestamp.to_rfc3339()),
107 payload: mapped.payload,
108 metadata: None,
109 }
110 .canonicalize();
111
112 match runtime.state_machine().handle(envelope).await {
113 Ok(response) => {
114 let replies = collect_text_responses(&response);
115 if replies.is_empty() {
116 tracing::info!(
117 flow_id = %flow.id,
118 update_id = update.update_id,
119 "flow completed without telegram replies"
120 );
121 return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
122 }
123
124 for text in &replies {
125 if let Err(err) =
126 send_telegram_message(runtime.as_ref(), message.chat.id, text).await
127 {
128 tracing::error!(
129 flow_id = %flow.id,
130 update_id = update.update_id,
131 error = %err,
132 "failed to send telegram message"
133 );
134 return remember_status(
135 runtime.as_ref(),
136 update.update_id,
137 StatusCode::BAD_GATEWAY,
138 );
139 }
140 }
141
142 tracing::info!(
143 flow_id = %flow.id,
144 update_id = update.update_id,
145 replies = replies.len(),
146 "flow completed"
147 );
148 remember_status(runtime.as_ref(), update.update_id, StatusCode::OK)
149 }
150 Err(err) => {
151 let chained = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
152 tracing::error!(
153 flow_id = %flow.id,
154 update_id = update.update_id,
155 error.cause_chain = ?chained,
156 "flow execution failed"
157 );
158 remember_status(
159 runtime.as_ref(),
160 update.update_id,
161 StatusCode::INTERNAL_SERVER_ERROR,
162 )
163 }
164 }
165}
166
167async fn send_telegram_message(runtime: &TenantRuntime, chat_id: i64, text: &str) -> Result<()> {
168 if !runtime.messaging_rate().lock().try_acquire() {
169 bail!("messaging send rate exceeded");
170 }
171
172 let token = runtime.get_secret("TELEGRAM_BOT_TOKEN")?;
173 let url = format!("https://api.telegram.org/bot{token}/sendMessage");
174 let body = json!({
175 "chat_id": chat_id,
176 "text": text,
177 "parse_mode": "MarkdownV2",
178 });
179 runtime
180 .http_client()
181 .post(url)
182 .json(&body)
183 .send()
184 .await?
185 .error_for_status()?;
186 Ok(())
187}
188
189fn remember_status(runtime: &TenantRuntime, update_id: i64, status: StatusCode) -> StatusCode {
190 let mut cache = runtime.telegram_cache().lock();
191 cache.put(update_id, status);
192 status
193}
194
195fn collect_text_responses(value: &serde_json::Value) -> Vec<String> {
196 match value {
197 serde_json::Value::Null => Vec::new(),
198 serde_json::Value::String(text) => vec![text.to_owned()],
199 serde_json::Value::Array(items) => {
200 let mut replies = Vec::new();
201 for item in items {
202 replies.extend(collect_text_responses(item));
203 }
204 replies
205 }
206 serde_json::Value::Object(map) => {
207 if let Some(messages) = map.get("messages").and_then(|v| v.as_array()) {
208 let mut replies = Vec::new();
209 for entry in messages {
210 replies.extend(collect_text_responses(entry));
211 }
212 return replies;
213 }
214 map.get("text")
215 .and_then(|v| v.as_str())
216 .map(|text| vec![text.to_owned()])
217 .unwrap_or_default()
218 }
219 _ => Vec::new(),
220 }
221}
222
223struct MappedTelegram {
224 provider_ids: ProviderIds,
225 session_key: String,
226 timestamp: DateTime<Utc>,
227 payload: Value,
228 channel: Option<String>,
229 conversation: Option<String>,
230 user: Option<String>,
231}
232
233fn map_telegram_activity(
234 tenant: &str,
235 message: &TelegramMessage,
236 text: &str,
237 update_id: i64,
238 raw: Value,
239) -> MappedTelegram {
240 let chat_id = message.chat.id.to_string();
241 let user = message.from.as_ref().map(|user| user.id.to_string());
242 let provider_ids = ProviderIds {
243 channel_id: Some(chat_id.clone()),
244 conversation_id: Some(chat_id.clone()),
245 user_id: user.clone(),
246 message_id: Some(update_id.to_string()),
247 event_id: Some(update_id.to_string()),
248 ..ProviderIds::default()
249 };
250 let timestamp = Utc::now();
251 let session_key = canonical_session_key(tenant, "telegram", &provider_ids);
252 let payload = build_canonical_payload(
253 tenant,
254 "telegram",
255 &provider_ids,
256 session_key.clone(),
257 &["chat".into()],
258 timestamp,
259 None,
260 Some(text.to_string()),
261 Vec::new(),
262 Vec::new(),
263 empty_entities(),
264 default_metadata(),
265 json!({ "chat_id": message.chat.id }),
266 raw,
267 );
268
269 MappedTelegram {
270 provider_ids,
271 session_key,
272 timestamp,
273 payload,
274 channel: Some(chat_id.clone()),
275 conversation: Some(chat_id),
276 user,
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use serde_json::json;
284
285 #[test]
286 fn collect_text_from_array_and_objects() {
287 let payload = json!([
288 { "text": "hello" },
289 { "messages": [{ "text": "nested" }, "raw"] },
290 null,
291 "world"
292 ]);
293 let replies = collect_text_responses(&payload);
294 assert_eq!(replies, vec!["hello", "nested", "raw", "world"]);
295 }
296
297 #[test]
298 fn collect_text_from_single_object() {
299 let payload = json!({ "text": "only" });
300 let replies = collect_text_responses(&payload);
301 assert_eq!(replies, vec!["only"]);
302 }
303
304 #[test]
305 fn telegram_activity_maps_to_canonical_payload() {
306 let update = TelegramUpdate {
307 update_id: 42,
308 message: Some(TelegramMessage {
309 text: Some("Hello".into()),
310 chat: TelegramChat { id: 123 },
311 from: Some(TelegramUser { id: 777 }),
312 }),
313 };
314 let message = update.message.clone().unwrap();
315 let raw = serde_json::to_value(&update).unwrap();
316 let mapped = map_telegram_activity("demo", &message, "Hello", update.update_id, raw);
317 assert_eq!(mapped.session_key, "demo:telegram:123:777");
318 assert_eq!(mapped.provider_ids.conversation_id.as_deref(), Some("123"));
319 assert_eq!(mapped.provider_ids.user_id.as_deref(), Some("777"));
320 assert_eq!(mapped.payload["provider"], json!("telegram"));
321 assert_eq!(mapped.payload["text"], json!("Hello"));
322 }
323}