roboticus_api/api/routes/agent/
poll_loops.rs1use std::sync::Arc;
4use std::time::Duration;
5
6use roboticus_channels::ChannelAdapter;
7
8use super::AppState;
9
10const CHANNEL_CONCURRENCY: usize = 8;
12
13pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
14 "I hit an internal processing error while handling that message. Please retry in a moment.";
15
16struct PollConfig {
18 platform: &'static str,
20 idle_sleep: Duration,
22 reply_on_error: bool,
24}
25
26async fn channel_poll_loop(
29 state: AppState,
30 adapter: Arc<dyn ChannelAdapter>,
31 cfg: PollConfig,
32 semaphore: &'static std::sync::LazyLock<Arc<tokio::sync::Semaphore>>,
33) {
34 tracing::info!(platform = cfg.platform, "inbound poll loop started");
35
36 loop {
37 match adapter.recv().await {
38 Ok(Some(inbound)) => {
39 state.channel_router.record_received(cfg.platform).await;
40 let state = state.clone();
41 let semaphore = Arc::clone(semaphore);
42 let platform = cfg.platform;
43 let reply_on_error = cfg.reply_on_error;
44 let inbound_for_error = inbound.clone();
45 tokio::spawn(async move {
46 let _permit = match semaphore.acquire_owned().await {
47 Ok(p) => p,
48 Err(_) => return,
49 };
50 if let Err(e) = super::process_channel_message(&state, inbound).await {
51 state
52 .channel_router
53 .record_processing_error(platform, e.clone())
54 .await;
55 if reply_on_error {
56 let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
57 if let Err(send_err) = state
58 .channel_router
59 .send_reply(
60 platform,
61 &chat_id,
62 CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
63 )
64 .await
65 {
66 tracing::warn!(
67 error = %send_err,
68 platform,
69 "failed to send processing failure reply"
70 );
71 }
72 }
73 tracing::error!(error = %e, platform, "message processing failed");
74 }
75 });
76 }
77 Ok(None) => {
78 if !cfg.idle_sleep.is_zero() {
79 tokio::time::sleep(cfg.idle_sleep).await;
80 }
81 }
82 Err(e) => {
83 tracing::error!(
84 error = %e,
85 platform = cfg.platform,
86 "poll error, backing off 5s"
87 );
88 tokio::time::sleep(Duration::from_secs(5)).await;
89 }
90 }
91 }
92}
93
94pub async fn telegram_poll_loop(state: AppState) {
95 static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
96 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
97
98 let adapter = match &state.telegram {
99 Some(a) => a.clone(),
100 None => return,
101 };
102
103 channel_poll_loop(
107 state,
108 adapter,
109 PollConfig {
110 platform: "telegram",
111 idle_sleep: Duration::ZERO,
112 reply_on_error: true,
113 },
114 &SEMAPHORE,
115 )
116 .await;
117}
118
119pub async fn discord_poll_loop(state: AppState) {
120 static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
121 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
122
123 let adapter = match &state.discord {
124 Some(a) => a.clone(),
125 None => return,
126 };
127
128 channel_poll_loop(
129 state,
130 adapter,
131 PollConfig {
132 platform: "discord",
133 idle_sleep: Duration::from_millis(300),
134 reply_on_error: false,
135 },
136 &SEMAPHORE,
137 )
138 .await;
139}
140
141pub async fn signal_poll_loop(state: AppState) {
142 static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
143 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
144
145 let adapter = match &state.signal {
146 Some(a) => a.clone(),
147 None => return,
148 };
149
150 channel_poll_loop(
151 state,
152 adapter,
153 PollConfig {
154 platform: "signal",
155 idle_sleep: Duration::from_millis(300),
156 reply_on_error: false,
157 },
158 &SEMAPHORE,
159 )
160 .await;
161}
162
163pub async fn email_poll_loop(state: AppState) {
164 static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
165 std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
166
167 let adapter = match &state.email {
168 Some(a) => a.clone(),
169 None => return,
170 };
171
172 channel_poll_loop(
173 state,
174 adapter,
175 PollConfig {
176 platform: "email",
177 idle_sleep: Duration::from_secs(1),
178 reply_on_error: false,
179 },
180 &SEMAPHORE,
181 )
182 .await;
183}