greentic_runner_host/runner/
adapt_messaging.rs

1use std::sync::Arc;
2
3use anyhow::{Result, bail};
4use axum::extract::{Json, State};
5use axum::http::StatusCode;
6use serde::Deserialize;
7use serde_json::json;
8
9use super::{ServerState, engine::FlowContext};
10
11#[derive(Debug, Deserialize)]
12pub struct TelegramUpdate {
13    update_id: i64,
14    #[serde(default)]
15    message: Option<TelegramMessage>,
16}
17
18#[derive(Debug, Deserialize)]
19pub(crate) struct TelegramMessage {
20    #[serde(default)]
21    text: Option<String>,
22    chat: TelegramChat,
23    #[serde(default)]
24    from: Option<TelegramUser>,
25}
26
27#[derive(Debug, Deserialize)]
28pub(crate) struct TelegramChat {
29    id: i64,
30}
31
32#[derive(Debug, Deserialize)]
33pub(crate) struct TelegramUser {
34    id: i64,
35}
36
37pub async fn telegram_webhook(
38    State(state): State<Arc<ServerState>>,
39    Json(update): Json<TelegramUpdate>,
40) -> StatusCode {
41    if let Some(status) = {
42        let mut cache = state.telegram_cache.lock();
43        cache.get(&update.update_id).copied()
44    } {
45        tracing::debug!(
46            update_id = update.update_id,
47            status = %status,
48            "duplicate telegram update skipped"
49        );
50        return status;
51    }
52
53    let message = match update.message {
54        Some(msg) => msg,
55        None => {
56            tracing::debug!(update_id = update.update_id, "no message payload in update");
57            return remember_status(state.as_ref(), update.update_id, StatusCode::NO_CONTENT);
58        }
59    };
60
61    let text = match message.text {
62        Some(text) if !text.trim().is_empty() => text,
63        _ => {
64            tracing::debug!(update_id = update.update_id, "ignoring non-text message");
65            return remember_status(state.as_ref(), update.update_id, StatusCode::NO_CONTENT);
66        }
67    };
68
69    let flow = match state.engine.flow_by_type("messaging") {
70        Some(flow) => flow,
71        None => {
72            tracing::error!("no messaging flow registered in pack");
73            return StatusCode::NOT_FOUND;
74        }
75    };
76
77    let payload = json!({
78        "chat_id": message.chat.id,
79        "text": text,
80        "user_id": message.from.as_ref().map(|user| user.id),
81        "update_id": update.update_id,
82    });
83
84    match state
85        .engine
86        .execute(
87            FlowContext {
88                tenant: &state.config.tenant,
89                flow_id: &flow.id,
90                node_id: None,
91                tool: None,
92                action: Some("messaging"),
93                session_id: None,
94                provider_id: None,
95                retry_config: state.config.mcp_retry_config().into(),
96                observer: None,
97                mocks: None,
98            },
99            payload,
100        )
101        .await
102    {
103        Ok(response) => {
104            if let Some(outgoing_text) = extract_text_response(&response)
105                && let Err(err) =
106                    send_telegram_message(state.as_ref(), message.chat.id, &outgoing_text).await
107            {
108                tracing::error!(
109                    flow_id = %flow.id,
110                    update_id = update.update_id,
111                    error = %err,
112                    "failed to send telegram message"
113                );
114                return remember_status(state.as_ref(), update.update_id, StatusCode::BAD_GATEWAY);
115            }
116            tracing::info!(
117                flow_id = %flow.id,
118                update_id = update.update_id,
119                response = %response,
120                "flow completed"
121            );
122            remember_status(state.as_ref(), update.update_id, StatusCode::OK)
123        }
124        Err(err) => {
125            let chained = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
126            tracing::error!(
127                flow_id = %flow.id,
128                update_id = update.update_id,
129                error.cause_chain = ?chained,
130                "flow execution failed"
131            );
132            remember_status(
133                state.as_ref(),
134                update.update_id,
135                StatusCode::INTERNAL_SERVER_ERROR,
136            )
137        }
138    }
139}
140
141fn extract_text_response(value: &serde_json::Value) -> Option<String> {
142    if let Some(text) = value.get("text").and_then(|v| v.as_str()) {
143        return Some(text.to_owned());
144    }
145    if let Some(text) = value.as_str() {
146        return Some(text.to_owned());
147    }
148    None
149}
150
151async fn send_telegram_message(state: &ServerState, chat_id: i64, text: &str) -> Result<()> {
152    if !state.messaging_rate.lock().try_acquire() {
153        bail!("messaging send rate exceeded");
154    }
155
156    let token = state.get_secret("TELEGRAM_BOT_TOKEN")?;
157    let url = format!("https://api.telegram.org/bot{token}/sendMessage");
158    let body = json!({
159        "chat_id": chat_id,
160        "text": text,
161        "parse_mode": "MarkdownV2",
162    });
163    state
164        .http_client
165        .post(url)
166        .json(&body)
167        .send()
168        .await?
169        .error_for_status()?;
170    Ok(())
171}
172
173fn remember_status(state: &ServerState, update_id: i64, status: StatusCode) -> StatusCode {
174    let mut cache = state.telegram_cache.lock();
175    cache.put(update_id, status);
176    status
177}