Skip to main content

roboticus_api/api/routes/agent/
poll_loops.rs

1//! Generic channel polling loop and platform-specific wrappers.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use roboticus_channels::ChannelAdapter;
7
8use super::AppState;
9
10/// Maximum concurrent channel messages processed per platform.
11const CHANNEL_CONCURRENCY: usize = 8;
12
13pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
14    "I hit an internal processing error while handling that message. Please retry in a moment.";
15
16/// Configuration for channel-specific poll loop behavior.
17struct PollConfig {
18    /// Platform name for logging and router tracking.
19    platform: &'static str,
20    /// Sleep duration when recv() returns Ok(None) (no message available).
21    idle_sleep: Duration,
22    /// Whether to send an error reply to the user on processing failure.
23    reply_on_error: bool,
24}
25
26/// Generic poll loop for any `ChannelAdapter`. Handles recv → spawn → process
27/// with semaphore-bounded concurrency and error recovery.
28async fn channel_poll_loop(
29    state: AppState,
30    adapter: Arc<dyn ChannelAdapter>,
31    cfg: PollConfig,
32    semaphore: &'static std::sync::LazyLock<Arc<tokio::sync::Semaphore>>,
33) {
34    tracing::info!(platform = cfg.platform, "inbound poll loop started");
35
36    loop {
37        match adapter.recv().await {
38            Ok(Some(inbound)) => {
39                state.channel_router.record_received(cfg.platform).await;
40                let state = state.clone();
41                let semaphore = Arc::clone(semaphore);
42                let platform = cfg.platform;
43                let reply_on_error = cfg.reply_on_error;
44                let inbound_for_error = inbound.clone();
45                tokio::spawn(async move {
46                    let _permit = match semaphore.acquire_owned().await {
47                        Ok(p) => p,
48                        Err(_) => return,
49                    };
50                    if let Err(e) = super::process_channel_message(&state, inbound).await {
51                        state
52                            .channel_router
53                            .record_processing_error(platform, e.clone())
54                            .await;
55                        if reply_on_error {
56                            let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
57                            if let Err(send_err) = state
58                                .channel_router
59                                .send_reply(
60                                    platform,
61                                    &chat_id,
62                                    CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
63                                )
64                                .await
65                            {
66                                tracing::warn!(
67                                    error = %send_err,
68                                    platform,
69                                    "failed to send processing failure reply"
70                                );
71                            }
72                        }
73                        tracing::error!(error = %e, platform, "message processing failed");
74                    }
75                });
76            }
77            Ok(None) => {
78                if !cfg.idle_sleep.is_zero() {
79                    tokio::time::sleep(cfg.idle_sleep).await;
80                }
81            }
82            Err(e) => {
83                tracing::error!(
84                    error = %e,
85                    platform = cfg.platform,
86                    "poll error, backing off 5s"
87                );
88                tokio::time::sleep(Duration::from_secs(5)).await;
89            }
90        }
91    }
92}
93
94pub async fn telegram_poll_loop(state: AppState) {
95    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
96        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
97
98    let adapter = match &state.telegram {
99        Some(a) => a.clone(),
100        None => return,
101    };
102
103    // Telegram uses the generic loop with error replies enabled.
104    // Auth-failure escalation is handled in the adapter's recv() implementation
105    // via HTTP 401/404 detection.
106    channel_poll_loop(
107        state,
108        adapter,
109        PollConfig {
110            platform: "telegram",
111            idle_sleep: Duration::ZERO,
112            reply_on_error: true,
113        },
114        &SEMAPHORE,
115    )
116    .await;
117}
118
119pub async fn discord_poll_loop(state: AppState) {
120    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
121        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
122
123    let adapter = match &state.discord {
124        Some(a) => a.clone(),
125        None => return,
126    };
127
128    channel_poll_loop(
129        state,
130        adapter,
131        PollConfig {
132            platform: "discord",
133            idle_sleep: Duration::from_millis(300),
134            reply_on_error: false,
135        },
136        &SEMAPHORE,
137    )
138    .await;
139}
140
141pub async fn signal_poll_loop(state: AppState) {
142    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
143        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
144
145    let adapter = match &state.signal {
146        Some(a) => a.clone(),
147        None => return,
148    };
149
150    channel_poll_loop(
151        state,
152        adapter,
153        PollConfig {
154            platform: "signal",
155            idle_sleep: Duration::from_millis(300),
156            reply_on_error: false,
157        },
158        &SEMAPHORE,
159    )
160    .await;
161}
162
163pub async fn email_poll_loop(state: AppState) {
164    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
165        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
166
167    let adapter = match &state.email {
168        Some(a) => a.clone(),
169        None => return,
170    };
171
172    channel_poll_loop(
173        state,
174        adapter,
175        PollConfig {
176            platform: "email",
177            idle_sleep: Duration::from_secs(1),
178            reply_on_error: false,
179        },
180        &SEMAPHORE,
181    )
182    .await;
183}