roboticus_api/api/routes/agent/
poll_loops.rs1use std::sync::Arc;
4
5use roboticus_channels::ChannelAdapter;
6
7use super::AppState;
8
9pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
10 "I hit an internal processing error while handling that message. Please retry in a moment.";
11
12pub async fn telegram_poll_loop(state: AppState) {
13 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
14 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
15
16 let adapter = match &state.telegram {
17 Some(a) => a.clone(),
18 None => return,
19 };
20
21 tracing::info!("Telegram long-poll loop started");
22 let mut consecutive_auth_failures: u32 = 0;
23
24 loop {
25 match adapter.recv().await {
26 Ok(Some(inbound)) => {
27 consecutive_auth_failures = 0;
28 state.channel_router.record_received("telegram").await;
29 let state = state.clone();
30 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
31 let inbound_for_error = inbound.clone();
32 tokio::spawn(async move {
33 let _permit = match semaphore.acquire_owned().await {
34 Ok(p) => p,
35 Err(_) => return,
36 };
37 if let Err(e) = super::process_channel_message(&state, inbound).await {
38 state
39 .channel_router
40 .record_processing_error("telegram", e.clone())
41 .await;
42 let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
43 if let Err(send_err) = state
44 .channel_router
45 .send_reply(
46 "telegram",
47 &chat_id,
48 CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
49 )
50 .await
51 {
52 tracing::warn!(
53 error = %send_err,
54 "failed to send Telegram processing failure reply"
55 );
56 }
57 tracing::error!(error = %e, "Telegram message processing failed");
58 }
59 });
60 }
61 Ok(None) => {
62 consecutive_auth_failures = 0;
63 }
64 Err(e) => {
65 let err_text = e.to_string();
66 let looks_like_auth = err_text.contains("Telegram API 404")
67 || err_text.contains("Telegram API 401")
68 || err_text
69 .to_ascii_lowercase()
70 .contains("invalid/revoked bot token");
71 if looks_like_auth {
72 consecutive_auth_failures = consecutive_auth_failures.saturating_add(1);
73 let backoff = if consecutive_auth_failures < 3 {
74 15
75 } else if consecutive_auth_failures < 10 {
76 30
77 } else {
78 60
79 };
80 if consecutive_auth_failures == 1
81 || consecutive_auth_failures.is_multiple_of(10)
82 {
83 tracing::error!(
84 error = %e,
85 failures = consecutive_auth_failures,
86 "Telegram poll authentication failed (likely invalid/revoked token). Repair with: `roboticus keystore set telegram_bot_token \"<TOKEN>\"` then restart."
87 );
88 } else {
89 tracing::warn!(
90 error = %e,
91 failures = consecutive_auth_failures,
92 "Telegram auth failure persists; backing off"
93 );
94 }
95 tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
96 } else {
97 consecutive_auth_failures = 0;
98 tracing::error!(error = %e, "Telegram poll error, backing off 5s");
99 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
100 }
101 }
102 }
103 }
104}
105
106pub async fn discord_poll_loop(state: AppState) {
107 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
108 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
109 let adapter = match &state.discord {
110 Some(a) => a.clone(),
111 None => return,
112 };
113 tracing::info!("Discord inbound loop started");
114 loop {
115 match adapter.recv().await {
116 Ok(Some(inbound)) => {
117 state.channel_router.record_received("discord").await;
118 let state = state.clone();
119 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
120 tokio::spawn(async move {
121 let _permit = match semaphore.acquire_owned().await {
122 Ok(p) => p,
123 Err(_) => return,
124 };
125 if let Err(e) = super::process_channel_message(&state, inbound).await {
126 state
127 .channel_router
128 .record_processing_error("discord", e.clone())
129 .await;
130 tracing::error!(error = %e, "Discord message processing failed");
131 }
132 });
133 }
134 Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
135 Err(e) => {
136 tracing::error!(error = %e, "Discord inbound loop error, backing off 5s");
137 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
138 }
139 }
140 }
141}
142
143pub async fn signal_poll_loop(state: AppState) {
144 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
145 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
146 let adapter = match &state.signal {
147 Some(a) => a.clone(),
148 None => return,
149 };
150 tracing::info!("Signal inbound loop started");
151 loop {
152 match adapter.recv().await {
153 Ok(Some(inbound)) => {
154 state.channel_router.record_received("signal").await;
155 let state = state.clone();
156 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
157 tokio::spawn(async move {
158 let _permit = match semaphore.acquire_owned().await {
159 Ok(p) => p,
160 Err(_) => return,
161 };
162 if let Err(e) = super::process_channel_message(&state, inbound).await {
163 state
164 .channel_router
165 .record_processing_error("signal", e.clone())
166 .await;
167 tracing::error!(error = %e, "Signal message processing failed");
168 }
169 });
170 }
171 Ok(None) => tokio::time::sleep(std::time::Duration::from_millis(300)).await,
172 Err(e) => {
173 tracing::error!(error = %e, "Signal inbound loop error, backing off 5s");
174 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
175 }
176 }
177 }
178}
179
180pub async fn email_poll_loop(state: AppState) {
181 static CHANNEL_SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
182 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(8)));
183 let adapter = match &state.email {
184 Some(a) => a.clone(),
185 None => return,
186 };
187 tracing::info!("Email inbound loop started");
188 loop {
189 match adapter.recv().await {
190 Ok(Some(inbound)) => {
191 state.channel_router.record_received("email").await;
192 let state = state.clone();
193 let semaphore = Arc::clone(&CHANNEL_SEMAPHORE);
194 tokio::spawn(async move {
195 let _permit = match semaphore.acquire_owned().await {
196 Ok(p) => p,
197 Err(_) => return,
198 };
199 if let Err(e) = super::process_channel_message(&state, inbound).await {
200 state
201 .channel_router
202 .record_processing_error("email", e.clone())
203 .await;
204 tracing::error!(error = %e, "Email message processing failed");
205 }
206 });
207 }
208 Ok(None) => tokio::time::sleep(std::time::Duration::from_secs(1)).await,
209 Err(e) => {
210 tracing::error!(error = %e, "Email inbound loop error, backing off 5s");
211 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
212 }
213 }
214 }
215}