roboticus_api/api/routes/agent/
poll_loops.rs1use std::sync::Arc;
4
5use roboticus_channels::ChannelAdapter;
6
7use super::AppState;
8
9const CHANNEL_CONCURRENCY: usize = 8;
11
12pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
13 "I hit an internal processing error while handling that message. Please retry in a moment.";
14
15pub async fn telegram_poll_loop(state: AppState) {
16 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
17 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
18
19 let adapter = match &state.telegram {
20 Some(a) => a.clone(),
21 None => return,
22 };
23
24 tracing::info!("Telegram long-poll loop started");
25 let mut consecutive_auth_failures: u32 = 0;
26
27 loop {
28 match adapter.recv().await {
29 Ok(Some(inbound)) => {
30 consecutive_auth_failures = 0;
31 state.channel_router.record_received("telegram").await;
32 let state = state.clone();
33 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
34 let inbound_for_error = inbound.clone();
35 tokio::spawn(async move {
36 let _permit = match semaphore.acquire_owned().await {
37 Ok(p) => p,
38 Err(_) => return,
39 };
40 if let Err(e) = super::process_channel_message(&state, inbound).await {
41 state
42 .channel_router
43 .record_processing_error("telegram", e.clone())
44 .await;
45 let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
46 if let Err(send_err) = state
47 .channel_router
48 .send_reply(
49 "telegram",
50 &chat_id,
51 CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
52 )
53 .await
54 {
55 tracing::warn!(
56 error = %send_err,
57 "failed to send Telegram processing failure reply"
58 );
59 }
60 tracing::error!(error = %e, "Telegram message processing failed");
61 }
62 });
63 }
64 Ok(None) => {
65 consecutive_auth_failures = 0;
66 }
67 Err(e) => {
68 let err_text = e.to_string();
69 let looks_like_auth = err_text.contains("Telegram API 404")
70 || err_text.contains("Telegram API 401")
71 || err_text
72 .to_ascii_lowercase()
73 .contains("invalid/revoked bot token");
74 if looks_like_auth {
75 consecutive_auth_failures = consecutive_auth_failures.saturating_add(1);
76 let backoff = if consecutive_auth_failures < 3 {
77 15
78 } else if consecutive_auth_failures < 10 {
79 30
80 } else {
81 60
82 };
83 if consecutive_auth_failures == 1
84 || consecutive_auth_failures.is_multiple_of(10)
85 {
86 tracing::error!(
87 error = %e,
88 failures = consecutive_auth_failures,
89 "Telegram poll authentication failed (likely invalid/revoked token). Repair with: `roboticus keystore set telegram_bot_token \"<TOKEN>\"` then restart."
90 );
91 } else {
92 tracing::warn!(
93 error = %e,
94 failures = consecutive_auth_failures,
95 "Telegram auth failure persists; backing off"
96 );
97 }
98 tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
99 } else {
100 consecutive_auth_failures = 0;
101 tracing::error!(error = %e, "Telegram poll error, backing off 5s");
102 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
103 }
104 }
105 }
106 }
107}
108
109pub async fn discord_poll_loop(state: AppState) {
110 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
111 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
112 let adapter = match &state.discord {
113 Some(a) => a.clone(),
114 None => return,
115 };
116 tracing::info!("Discord inbound loop started");
117 loop {
118 match adapter.recv().await {
119 Ok(Some(inbound)) => {
120 state.channel_router.record_received("discord").await;
121 let state = state.clone();
122 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
123 tokio::spawn(async move {
124 let _permit = match semaphore.acquire_owned().await {
125 Ok(p) => p,
126 Err(_) => return,
127 };
128 if let Err(e) = super::process_channel_message(&state, inbound).await {
129 state
130 .channel_router
131 .record_processing_error("discord", e.clone())
132 .await;
133 tracing::error!(error = %e, "Discord message processing failed");
134 }
135 });
136 }
137 Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
138 Err(e) => {
139 tracing::error!(error = %e, "Discord inbound loop error, backing off 5s");
140 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
141 }
142 }
143 }
144}
145
146pub async fn signal_poll_loop(state: AppState) {
147 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
148 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
149 let adapter = match &state.signal {
150 Some(a) => a.clone(),
151 None => return,
152 };
153 tracing::info!("Signal inbound loop started");
154 loop {
155 match adapter.recv().await {
156 Ok(Some(inbound)) => {
157 state.channel_router.record_received("signal").await;
158 let state = state.clone();
159 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
160 tokio::spawn(async move {
161 let _permit = match semaphore.acquire_owned().await {
162 Ok(p) => p,
163 Err(_) => return,
164 };
165 if let Err(e) = super::process_channel_message(&state, inbound).await {
166 state
167 .channel_router
168 .record_processing_error("signal", e.clone())
169 .await;
170 tracing::error!(error = %e, "Signal message processing failed");
171 }
172 });
173 }
174 Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
175 Err(e) => {
176 tracing::error!(error = %e, "Signal inbound loop error, backing off 5s");
177 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
178 }
179 }
180 }
181}
182
183pub async fn email_poll_loop(state: AppState) {
184 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
185 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
186 let adapter = match &state.email {
187 Some(a) => a.clone(),
188 None => return,
189 };
190 tracing::info!("Email inbound loop started");
191 loop {
192 match adapter.recv().await {
193 Ok(Some(inbound)) => {
194 state.channel_router.record_received("email").await;
195 let state = state.clone();
196 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
197 tokio::spawn(async move {
198 let _permit = match semaphore.acquire_owned().await {
199 Ok(p) => p,
200 Err(_) => return,
201 };
202 if let Err(e) = super::process_channel_message(&state, inbound).await {
203 state
204 .channel_router
205 .record_processing_error("email", e.clone())
206 .await;
207 tracing::error!(error = %e, "Email message processing failed");
208 }
209 });
210 }
211 Ok(None) => tokio::time::sleep(std::time::Duration::from_secs(1)).await,
212 Err(e) => {
213 tracing::error!(error = %e, "Email inbound loop error, backing off 5s");
214 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
215 }
216 }
217 }
218}