greentic_runner_host/runner/
adapt_messaging.rs1use anyhow::{Result, bail};
2use axum::extract::Json;
3use axum::http::StatusCode;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8use crate::engine::runtime::IngressEnvelope;
9use crate::ingress::{
10 ProviderIds, build_canonical_payload, canonical_session_key, default_metadata, empty_entities,
11};
12use crate::provider_core_only;
13use crate::routing::TenantRuntimeHandle;
14use crate::runtime::TenantRuntime;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct TelegramUpdate {
18 update_id: i64,
19 #[serde(default)]
20 message: Option<TelegramMessage>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub(crate) struct TelegramMessage {
25 #[serde(default)]
26 text: Option<String>,
27 chat: TelegramChat,
28 #[serde(default)]
29 from: Option<TelegramUser>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub(crate) struct TelegramChat {
34 id: i64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub(crate) struct TelegramUser {
39 id: i64,
40}
41
42pub async fn telegram_webhook(
43 TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
44 Json(update): Json<TelegramUpdate>,
45) -> StatusCode {
46 if provider_core_only::is_enabled() {
47 tracing::warn!(
48 update_id = update.update_id,
49 "provider-core only mode enabled; blocking telegram webhook"
50 );
51 return StatusCode::NOT_IMPLEMENTED;
52 }
53
54 if let Some(status) = {
55 let mut cache = runtime.telegram_cache().lock();
56 cache.get(&update.update_id).copied()
57 } {
58 tracing::debug!(
59 update_id = update.update_id,
60 status = %status,
61 "duplicate telegram update skipped"
62 );
63 return status;
64 }
65
66 let message = match update.message.as_ref() {
67 Some(msg) => msg,
68 None => {
69 tracing::debug!(update_id = update.update_id, "no message payload in update");
70 return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
71 }
72 };
73
74 let text = match message.text.as_ref() {
75 Some(text) if !text.trim().is_empty() => text.clone(),
76 _ => {
77 tracing::debug!(update_id = update.update_id, "ignoring non-text message");
78 return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
79 }
80 };
81
82 let engine = runtime.engine();
83 let flow = match engine.flow_by_type("messaging") {
84 Some(flow) => flow,
85 None => {
86 tracing::error!("no messaging flow registered in pack");
87 return StatusCode::NOT_FOUND;
88 }
89 };
90
91 let raw_value = serde_json::to_value(&update).unwrap_or(Value::Null);
92 let mapped = map_telegram_activity(&tenant, message, &text, update.update_id, raw_value);
93
94 let envelope = IngressEnvelope {
95 tenant: tenant.clone(),
96 env: None,
97 pack_id: Some(flow.pack_id.clone()),
98 flow_id: flow.id.clone(),
99 flow_type: Some(flow.flow_type.clone()),
100 action: Some("messaging".into()),
101 session_hint: Some(mapped.session_key.clone()),
102 provider: Some("telegram".into()),
103 channel: mapped.channel.clone(),
104 conversation: mapped.conversation.clone(),
105 user: mapped.user.clone(),
106 activity_id: mapped.provider_ids.message_id.clone(),
107 timestamp: Some(mapped.timestamp.to_rfc3339()),
108 payload: mapped.payload,
109 metadata: None,
110 reply_scope: None,
111 }
112 .canonicalize();
113
114 match runtime.state_machine().handle(envelope).await {
115 Ok(response) => {
116 let replies = collect_text_responses(&response);
117 if replies.is_empty() {
118 tracing::info!(
119 flow_id = %flow.id,
120 update_id = update.update_id,
121 "flow completed without telegram replies"
122 );
123 return remember_status(runtime.as_ref(), update.update_id, StatusCode::NO_CONTENT);
124 }
125
126 for text in &replies {
127 if let Err(err) =
128 send_telegram_message(runtime.as_ref(), message.chat.id, text).await
129 {
130 tracing::error!(
131 flow_id = %flow.id,
132 update_id = update.update_id,
133 error = %err,
134 "failed to send telegram message"
135 );
136 return remember_status(
137 runtime.as_ref(),
138 update.update_id,
139 StatusCode::BAD_GATEWAY,
140 );
141 }
142 }
143
144 tracing::info!(
145 flow_id = %flow.id,
146 update_id = update.update_id,
147 replies = replies.len(),
148 "flow completed"
149 );
150 remember_status(runtime.as_ref(), update.update_id, StatusCode::OK)
151 }
152 Err(err) => {
153 let chained = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
154 tracing::error!(
155 flow_id = %flow.id,
156 update_id = update.update_id,
157 error.cause_chain = ?chained,
158 "flow execution failed"
159 );
160 remember_status(
161 runtime.as_ref(),
162 update.update_id,
163 StatusCode::INTERNAL_SERVER_ERROR,
164 )
165 }
166 }
167}
168
169async fn send_telegram_message(runtime: &TenantRuntime, chat_id: i64, text: &str) -> Result<()> {
170 if !runtime.messaging_rate().lock().try_acquire() {
171 bail!("messaging send rate exceeded");
172 }
173
174 let token = runtime.get_secret("TELEGRAM_BOT_TOKEN")?;
175 let url = format!("https://api.telegram.org/bot{token}/sendMessage");
176 let body = json!({
177 "chat_id": chat_id,
178 "text": text,
179 "parse_mode": "MarkdownV2",
180 });
181 runtime
182 .http_client()
183 .post(url)
184 .json(&body)
185 .send()
186 .await?
187 .error_for_status()?;
188 Ok(())
189}
190
191fn remember_status(runtime: &TenantRuntime, update_id: i64, status: StatusCode) -> StatusCode {
192 let mut cache = runtime.telegram_cache().lock();
193 cache.put(update_id, status);
194 status
195}
196
197fn collect_text_responses(value: &serde_json::Value) -> Vec<String> {
198 match value {
199 serde_json::Value::Null => Vec::new(),
200 serde_json::Value::String(text) => vec![text.to_owned()],
201 serde_json::Value::Array(items) => {
202 let mut replies = Vec::new();
203 for item in items {
204 replies.extend(collect_text_responses(item));
205 }
206 replies
207 }
208 serde_json::Value::Object(map) => {
209 if let Some(messages) = map.get("messages").and_then(|v| v.as_array()) {
210 let mut replies = Vec::new();
211 for entry in messages {
212 replies.extend(collect_text_responses(entry));
213 }
214 return replies;
215 }
216 map.get("text")
217 .and_then(|v| v.as_str())
218 .map(|text| vec![text.to_owned()])
219 .unwrap_or_default()
220 }
221 _ => Vec::new(),
222 }
223}
224
225struct MappedTelegram {
226 provider_ids: ProviderIds,
227 session_key: String,
228 timestamp: DateTime<Utc>,
229 payload: Value,
230 channel: Option<String>,
231 conversation: Option<String>,
232 user: Option<String>,
233}
234
235fn map_telegram_activity(
236 tenant: &str,
237 message: &TelegramMessage,
238 text: &str,
239 update_id: i64,
240 raw: Value,
241) -> MappedTelegram {
242 let chat_id = message.chat.id.to_string();
243 let user = message.from.as_ref().map(|user| user.id.to_string());
244 let provider_ids = ProviderIds {
245 channel_id: Some(chat_id.clone()),
246 conversation_id: Some(chat_id.clone()),
247 user_id: user.clone(),
248 message_id: Some(update_id.to_string()),
249 event_id: Some(update_id.to_string()),
250 ..ProviderIds::default()
251 };
252 let timestamp = Utc::now();
253 let session_key = canonical_session_key(tenant, "telegram", &provider_ids);
254 let payload = build_canonical_payload(
255 tenant,
256 "telegram",
257 &provider_ids,
258 session_key.clone(),
259 &["chat".into()],
260 timestamp,
261 None,
262 Some(text.to_string()),
263 Vec::new(),
264 Vec::new(),
265 empty_entities(),
266 default_metadata(),
267 json!({ "chat_id": message.chat.id }),
268 raw,
269 );
270
271 MappedTelegram {
272 provider_ids,
273 session_key,
274 timestamp,
275 payload,
276 channel: Some(chat_id.clone()),
277 conversation: Some(chat_id),
278 user,
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use serde_json::json;
286
287 #[test]
288 fn collect_text_from_array_and_objects() {
289 let payload = json!([
290 { "text": "hello" },
291 { "messages": [{ "text": "nested" }, "raw"] },
292 null,
293 "world"
294 ]);
295 let replies = collect_text_responses(&payload);
296 assert_eq!(replies, vec!["hello", "nested", "raw", "world"]);
297 }
298
299 #[test]
300 fn collect_text_from_single_object() {
301 let payload = json!({ "text": "only" });
302 let replies = collect_text_responses(&payload);
303 assert_eq!(replies, vec!["only"]);
304 }
305
306 #[test]
307 fn telegram_activity_maps_to_canonical_payload() {
308 let update = TelegramUpdate {
309 update_id: 42,
310 message: Some(TelegramMessage {
311 text: Some("Hello".into()),
312 chat: TelegramChat { id: 123 },
313 from: Some(TelegramUser { id: 777 }),
314 }),
315 };
316 let message = update.message.clone().unwrap();
317 let raw = serde_json::to_value(&update).unwrap();
318 let mapped = map_telegram_activity("demo", &message, "Hello", update.update_id, raw);
319 assert_eq!(mapped.session_key, "demo:telegram:123:777");
320 assert_eq!(mapped.provider_ids.conversation_id.as_deref(), Some("123"));
321 assert_eq!(mapped.provider_ids.user_id.as_deref(), Some("777"));
322 assert_eq!(mapped.payload["provider"], json!("telegram"));
323 assert_eq!(mapped.payload["text"], json!("Hello"));
324 }
325}