Skip to main content

roboticus_api/api/routes/agent/
poll_loops.rs

1//! Platform-specific polling loops for Telegram, Discord, Signal, and Email.
2
3use std::sync::Arc;
4
5use roboticus_channels::ChannelAdapter;
6
7use super::AppState;
8
9/// Maximum concurrent channel messages processed per platform.
10const CHANNEL_CONCURRENCY: usize = 8;
11
12pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
13    "I hit an internal processing error while handling that message. Please retry in a moment.";
14
15pub async fn telegram_poll_loop(state: AppState) {
16    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
17        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
18
19    let adapter = match &state.telegram {
20        Some(a) => a.clone(),
21        None => return,
22    };
23
24    tracing::info!("Telegram long-poll loop started");
25    let mut consecutive_auth_failures: u32 = 0;
26
27    loop {
28        match adapter.recv().await {
29            Ok(Some(inbound)) => {
30                consecutive_auth_failures = 0;
31                state.channel_router.record_received("telegram").await;
32                let state = state.clone();
33                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
34                let inbound_for_error = inbound.clone();
35                tokio::spawn(async move {
36                    let _permit = match semaphore.acquire_owned().await {
37                        Ok(p) => p,
38                        Err(_) => return,
39                    };
40                    if let Err(e) = super::process_channel_message(&state, inbound).await {
41                        state
42                            .channel_router
43                            .record_processing_error("telegram", e.clone())
44                            .await;
45                        let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
46                        if let Err(send_err) = state
47                            .channel_router
48                            .send_reply(
49                                "telegram",
50                                &chat_id,
51                                CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
52                            )
53                            .await
54                        {
55                            tracing::warn!(
56                                error = %send_err,
57                                "failed to send Telegram processing failure reply"
58                            );
59                        }
60                        tracing::error!(error = %e, "Telegram message processing failed");
61                    }
62                });
63            }
64            Ok(None) => {
65                consecutive_auth_failures = 0;
66            }
67            Err(e) => {
68                let err_text = e.to_string();
69                let looks_like_auth = err_text.contains("Telegram API 404")
70                    || err_text.contains("Telegram API 401")
71                    || err_text
72                        .to_ascii_lowercase()
73                        .contains("invalid/revoked bot token");
74                if looks_like_auth {
75                    consecutive_auth_failures = consecutive_auth_failures.saturating_add(1);
76                    let backoff = if consecutive_auth_failures < 3 {
77                        15
78                    } else if consecutive_auth_failures < 10 {
79                        30
80                    } else {
81                        60
82                    };
83                    if consecutive_auth_failures == 1
84                        || consecutive_auth_failures.is_multiple_of(10)
85                    {
86                        tracing::error!(
87                            error = %e,
88                            failures = consecutive_auth_failures,
89                            "Telegram poll authentication failed (likely invalid/revoked token). Repair with: `roboticus keystore set telegram_bot_token \"<TOKEN>\"` then restart."
90                        );
91                    } else {
92                        tracing::warn!(
93                            error = %e,
94                            failures = consecutive_auth_failures,
95                            "Telegram auth failure persists; backing off"
96                        );
97                    }
98                    tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
99                } else {
100                    consecutive_auth_failures = 0;
101                    tracing::error!(error = %e, "Telegram poll error, backing off 5s");
102                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
103                }
104            }
105        }
106    }
107}
108
109pub async fn discord_poll_loop(state: AppState) {
110    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
111        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
112    let adapter = match &state.discord {
113        Some(a) => a.clone(),
114        None => return,
115    };
116    tracing::info!("Discord inbound loop started");
117    loop {
118        match adapter.recv().await {
119            Ok(Some(inbound)) => {
120                state.channel_router.record_received("discord").await;
121                let state = state.clone();
122                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
123                tokio::spawn(async move {
124                    let _permit = match semaphore.acquire_owned().await {
125                        Ok(p) => p,
126                        Err(_) => return,
127                    };
128                    if let Err(e) = super::process_channel_message(&state, inbound).await {
129                        state
130                            .channel_router
131                            .record_processing_error("discord", e.clone())
132                            .await;
133                        tracing::error!(error = %e, "Discord message processing failed");
134                    }
135                });
136            }
137            Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
138            Err(e) => {
139                tracing::error!(error = %e, "Discord inbound loop error, backing off 5s");
140                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
141            }
142        }
143    }
144}
145
146pub async fn signal_poll_loop(state: AppState) {
147    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
148        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
149    let adapter = match &state.signal {
150        Some(a) => a.clone(),
151        None => return,
152    };
153    tracing::info!("Signal inbound loop started");
154    loop {
155        match adapter.recv().await {
156            Ok(Some(inbound)) => {
157                state.channel_router.record_received("signal").await;
158                let state = state.clone();
159                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
160                tokio::spawn(async move {
161                    let _permit = match semaphore.acquire_owned().await {
162                        Ok(p) => p,
163                        Err(_) => return,
164                    };
165                    if let Err(e) = super::process_channel_message(&state, inbound).await {
166                        state
167                            .channel_router
168                            .record_processing_error("signal", e.clone())
169                            .await;
170                        tracing::error!(error = %e, "Signal message processing failed");
171                    }
172                });
173            }
174            Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
175            Err(e) => {
176                tracing::error!(error = %e, "Signal inbound loop error, backing off 5s");
177                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
178            }
179        }
180    }
181}
182
183pub async fn email_poll_loop(state: AppState) {
184    static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
185        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
186    let adapter = match &state.email {
187        Some(a) => a.clone(),
188        None => return,
189    };
190    tracing::info!("Email inbound loop started");
191    loop {
192        match adapter.recv().await {
193            Ok(Some(inbound)) => {
194                state.channel_router.record_received("email").await;
195                let state = state.clone();
196                let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
197                tokio::spawn(async move {
198                    let _permit = match semaphore.acquire_owned().await {
199                        Ok(p) => p,
200                        Err(_) => return,
201                    };
202                    if let Err(e) = super::process_channel_message(&state, inbound).await {
203                        state
204                            .channel_router
205                            .record_processing_error("email", e.clone())
206                            .await;
207                        tracing::error!(error = %e, "Email message processing failed");
208                    }
209                });
210            }
211            Ok(None) => tokio::time::sleep(std::time::Duration::from_secs(1)).await,
212            Err(e) => {
213                tracing::error!(error = %e, "Email inbound loop error, backing off 5s");
214                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
215            }
216        }
217    }
218}