Skip to main content

roboticus_api/api/routes/agent/
poll_loops.rs

1//! Platform-specific polling loops for Telegram, Discord, Signal, and Email.
2
3use std::sync::Arc;
4
5use roboticus_channels::ChannelAdapter;
6
7use super::AppState;
8
9pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
10    "I hit an internal processing error while handling that message. Please retry in a moment.";
11
12pub async fn telegram_poll_loop(state: AppState) {
13    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
14        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
15
16    let adapter = match &state.telegram {
17        Some(a) => a.clone(),
18        None => return,
19    };
20
21    tracing::info!("Telegram long-poll loop started");
22    let mut consecutive_auth_failures: u32 = 0;
23
24    loop {
25        match adapter.recv().await {
26            Ok(Some(inbound)) => {
27                consecutive_auth_failures = 0;
28                state.channel_router.record_received("telegram").await;
29                let state = state.clone();
30                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
31                let inbound_for_error = inbound.clone();
32                tokio::spawn(async move {
33                    let _permit = match semaphore.acquire_owned().await {
34                        Ok(p) => p,
35                        Err(_) => return,
36                    };
37                    if let Err(e) = super::process_channel_message(&state, inbound).await {
38                        state
39                            .channel_router
40                            .record_processing_error("telegram", e.clone())
41                            .await;
42                        let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
43                        if let Err(send_err) = state
44                            .channel_router
45                            .send_reply(
46                                "telegram",
47                                &chat_id,
48                                CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
49                            )
50                            .await
51                        {
52                            tracing::warn!(
53                                error = %send_err,
54                                "failed to send Telegram processing failure reply"
55                            );
56                        }
57                        tracing::error!(error = %e, "Telegram message processing failed");
58                    }
59                });
60            }
61            Ok(None) => {
62                consecutive_auth_failures = 0;
63            }
64            Err(e) => {
65                let err_text = e.to_string();
66                let looks_like_auth = err_text.contains("Telegram API 404")
67                    || err_text.contains("Telegram API 401")
68                    || err_text
69                        .to_ascii_lowercase()
70                        .contains("invalid/revoked bot token");
71                if looks_like_auth {
72                    consecutive_auth_failures = consecutive_auth_failures.saturating_add(1);
73                    let backoff = if consecutive_auth_failures < 3 {
74                        15
75                    } else if consecutive_auth_failures < 10 {
76                        30
77                    } else {
78                        60
79                    };
80                    if consecutive_auth_failures == 1
81                        || consecutive_auth_failures.is_multiple_of(10)
82                    {
83                        tracing::error!(
84                            error = %e,
85                            failures = consecutive_auth_failures,
86                            "Telegram poll authentication failed (likely invalid/revoked token). Repair with: `roboticus keystore set telegram_bot_token \"<TOKEN>\"` then restart."
87                        );
88                    } else {
89                        tracing::warn!(
90                            error = %e,
91                            failures = consecutive_auth_failures,
92                            "Telegram auth failure persists; backing off"
93                        );
94                    }
95                    tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
96                } else {
97                    consecutive_auth_failures = 0;
98                    tracing::error!(error = %e, "Telegram poll error, backing off 5s");
99                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
100                }
101            }
102        }
103    }
104}
105
106pub async fn discord_poll_loop(state: AppState) {
107    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
108        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
109    let adapter = match &state.discord {
110        Some(a) => a.clone(),
111        None => return,
112    };
113    tracing::info!("Discord inbound loop started");
114    loop {
115        match adapter.recv().await {
116            Ok(Some(inbound)) => {
117                state.channel_router.record_received("discord").await;
118                let state = state.clone();
119                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
120                tokio::spawn(async move {
121                    let _permit = match semaphore.acquire_owned().await {
122                        Ok(p) => p,
123                        Err(_) => return,
124                    };
125                    if let Err(e) = super::process_channel_message(&state, inbound).await {
126                        state
127                            .channel_router
128                            .record_processing_error("discord", e.clone())
129                            .await;
130                        tracing::error!(error = %e, "Discord message processing failed");
131                    }
132                });
133            }
134            Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
135            Err(e) => {
136                tracing::error!(error = %e, "Discord inbound loop error, backing off 5s");
137                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
138            }
139        }
140    }
141}
142
143pub async fn signal_poll_loop(state: AppState) {
144    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
145        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
146    let adapter = match &state.signal {
147        Some(a) => a.clone(),
148        None => return,
149    };
150    tracing::info!("Signal inbound loop started");
151    loop {
152        match adapter.recv().await {
153            Ok(Some(inbound)) => {
154                state.channel_router.record_received("signal").await;
155                let state = state.clone();
156                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
157                tokio::spawn(async move {
158                    let _permit = match semaphore.acquire_owned().await {
159                        Ok(p) => p,
160                        Err(_) => return,
161                    };
162                    if let Err(e) = super::process_channel_message(&state, inbound).await {
163                        state
164                            .channel_router
165                            .record_processing_error("signal", e.clone())
166                            .await;
167                        tracing::error!(error = %e, "Signal message processing failed");
168                    }
169                });
170            }
171            Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
172            Err(e) => {
173                tracing::error!(error = %e, "Signal inbound loop error, backing off 5s");
174                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
175            }
176        }
177    }
178}
179
180pub async fn email_poll_loop(state: AppState) {
181    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
182        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
183    let adapter = match &state.email {
184        Some(a) => a.clone(),
185        None => return,
186    };
187    tracing::info!("Email inbound loop started");
188    loop {
189        match adapter.recv().await {
190            Ok(Some(inbound)) => {
191                state.channel_router.record_received("email").await;
192                let state = state.clone();
193                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
194                tokio::spawn(async move {
195                    let _permit = match semaphore.acquire_owned().await {
196                        Ok(p) => p,
197                        Err(_) => return,
198                    };
199                    if let Err(e) = super::process_channel_message(&state, inbound).await {
200                        state
201                            .channel_router
202                            .record_processing_error("email", e.clone())
203                            .await;
204                        tracing::error!(error = %e, "Email message processing failed");
205                    }
206                });
207            }
208            Ok(None) => tokio::time::sleep(std::time::Duration::from_secs(1)).await,
209            Err(e) => {
210                tracing::error!(error = %e, "Email inbound loop error, backing off 5s");
211                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
212            }
213        }
214    }
215}