greentic_runner_host/runner/
adapt_messaging.rs1use std::sync::Arc;
2
3use anyhow::{Result, bail};
4use axum::extract::{Json, State};
5use axum::http::StatusCode;
6use serde::Deserialize;
7use serde_json::json;
8
9use super::{ServerState, engine::FlowContext};
10
11#[derive(Debug, Deserialize)]
12pub struct TelegramUpdate {
13 update_id: i64,
14 #[serde(default)]
15 message: Option<TelegramMessage>,
16}
17
18#[derive(Debug, Deserialize)]
19pub(crate) struct TelegramMessage {
20 #[serde(default)]
21 text: Option<String>,
22 chat: TelegramChat,
23 #[serde(default)]
24 from: Option<TelegramUser>,
25}
26
27#[derive(Debug, Deserialize)]
28pub(crate) struct TelegramChat {
29 id: i64,
30}
31
32#[derive(Debug, Deserialize)]
33pub(crate) struct TelegramUser {
34 id: i64,
35}
36
37pub async fn telegram_webhook(
38 State(state): State<Arc<ServerState>>,
39 Json(update): Json<TelegramUpdate>,
40) -> StatusCode {
41 if let Some(status) = {
42 let mut cache = state.telegram_cache.lock();
43 cache.get(&update.update_id).copied()
44 } {
45 tracing::debug!(
46 update_id = update.update_id,
47 status = %status,
48 "duplicate telegram update skipped"
49 );
50 return status;
51 }
52
53 let message = match update.message {
54 Some(msg) => msg,
55 None => {
56 tracing::debug!(update_id = update.update_id, "no message payload in update");
57 return remember_status(state.as_ref(), update.update_id, StatusCode::NO_CONTENT);
58 }
59 };
60
61 let text = match message.text {
62 Some(text) if !text.trim().is_empty() => text,
63 _ => {
64 tracing::debug!(update_id = update.update_id, "ignoring non-text message");
65 return remember_status(state.as_ref(), update.update_id, StatusCode::NO_CONTENT);
66 }
67 };
68
69 let flow = match state.engine.flow_by_type("messaging") {
70 Some(flow) => flow,
71 None => {
72 tracing::error!("no messaging flow registered in pack");
73 return StatusCode::NOT_FOUND;
74 }
75 };
76
77 let payload = json!({
78 "chat_id": message.chat.id,
79 "text": text,
80 "user_id": message.from.as_ref().map(|user| user.id),
81 "update_id": update.update_id,
82 });
83
84 match state
85 .engine
86 .execute(
87 FlowContext {
88 tenant: &state.config.tenant,
89 flow_id: &flow.id,
90 node_id: None,
91 tool: None,
92 action: Some("messaging"),
93 session_id: None,
94 provider_id: None,
95 retry_config: state.config.mcp_retry_config().into(),
96 observer: None,
97 mocks: None,
98 },
99 payload,
100 )
101 .await
102 {
103 Ok(response) => {
104 if let Some(outgoing_text) = extract_text_response(&response)
105 && let Err(err) =
106 send_telegram_message(state.as_ref(), message.chat.id, &outgoing_text).await
107 {
108 tracing::error!(
109 flow_id = %flow.id,
110 update_id = update.update_id,
111 error = %err,
112 "failed to send telegram message"
113 );
114 return remember_status(state.as_ref(), update.update_id, StatusCode::BAD_GATEWAY);
115 }
116 tracing::info!(
117 flow_id = %flow.id,
118 update_id = update.update_id,
119 response = %response,
120 "flow completed"
121 );
122 remember_status(state.as_ref(), update.update_id, StatusCode::OK)
123 }
124 Err(err) => {
125 let chained = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
126 tracing::error!(
127 flow_id = %flow.id,
128 update_id = update.update_id,
129 error.cause_chain = ?chained,
130 "flow execution failed"
131 );
132 remember_status(
133 state.as_ref(),
134 update.update_id,
135 StatusCode::INTERNAL_SERVER_ERROR,
136 )
137 }
138 }
139}
140
141fn extract_text_response(value: &serde_json::Value) -> Option<String> {
142 if let Some(text) = value.get("text").and_then(|v| v.as_str()) {
143 return Some(text.to_owned());
144 }
145 if let Some(text) = value.as_str() {
146 return Some(text.to_owned());
147 }
148 None
149}
150
151async fn send_telegram_message(state: &ServerState, chat_id: i64, text: &str) -> Result<()> {
152 if !state.messaging_rate.lock().try_acquire() {
153 bail!("messaging send rate exceeded");
154 }
155
156 let token = state.get_secret("TELEGRAM_BOT_TOKEN")?;
157 let url = format!("https://api.telegram.org/bot{token}/sendMessage");
158 let body = json!({
159 "chat_id": chat_id,
160 "text": text,
161 "parse_mode": "MarkdownV2",
162 });
163 state
164 .http_client
165 .post(url)
166 .json(&body)
167 .send()
168 .await?
169 .error_for_status()?;
170 Ok(())
171}
172
173fn remember_status(state: &ServerState, update_id: i64, status: StatusCode) -> StatusCode {
174 let mut cache = state.telegram_cache.lock();
175 cache.put(update_id, status);
176 status
177}