use std::sync::Arc;
use std::time::Duration;
use roboticus_channels::ChannelAdapter;
use super::AppState;
const CHANNEL_CONCURRENCY: usize = 8;
pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
"I hit an internal processing error while handling that message. Please retry in a moment.";
struct PollConfig {
platform: &'static str,
idle_sleep: Duration,
reply_on_error: bool,
}
async fn channel_poll_loop(
state: AppState,
adapter: Arc<dyn ChannelAdapter>,
cfg: PollConfig,
semaphore: &'static std::sync::LazyLock<Arc<tokio::sync::Semaphore>>,
) {
tracing::info!(platform = cfg.platform, "inbound poll loop started");
loop {
match adapter.recv().await {
Ok(Some(inbound)) => {
state.channel_router.record_received(cfg.platform).await;
let state = state.clone();
let semaphore = Arc::clone(semaphore);
let platform = cfg.platform;
let reply_on_error = cfg.reply_on_error;
let inbound_for_error = inbound.clone();
tokio::spawn(async move {
let _permit = match semaphore.acquire_owned().await {
Ok(p) => p,
Err(_) => return,
};
if let Err(e) = super::process_channel_message(&state, inbound).await {
state
.channel_router
.record_processing_error(platform, e.clone())
.await;
if reply_on_error {
let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
if let Err(send_err) = state
.channel_router
.send_reply(
platform,
&chat_id,
CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
)
.await
{
tracing::warn!(
error = %send_err,
platform,
"failed to send processing failure reply"
);
}
}
tracing::error!(error = %e, platform, "message processing failed");
}
});
}
Ok(None) => {
if !cfg.idle_sleep.is_zero() {
tokio::time::sleep(cfg.idle_sleep).await;
}
}
Err(e) => {
tracing::error!(
error = %e,
platform = cfg.platform,
"poll error, backing off 5s"
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
pub async fn telegram_poll_loop(state: AppState) {
static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
let adapter = match &state.telegram {
Some(a) => a.clone(),
None => return,
};
channel_poll_loop(
state,
adapter,
PollConfig {
platform: "telegram",
idle_sleep: Duration::ZERO,
reply_on_error: true,
},
&SEMAPHORE,
)
.await;
}
pub async fn discord_poll_loop(state: AppState) {
static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
let adapter = match &state.discord {
Some(a) => a.clone(),
None => return,
};
channel_poll_loop(
state,
adapter,
PollConfig {
platform: "discord",
idle_sleep: Duration::from_millis(300),
reply_on_error: false,
},
&SEMAPHORE,
)
.await;
}
pub async fn signal_poll_loop(state: AppState) {
static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
let adapter = match &state.signal {
Some(a) => a.clone(),
None => return,
};
channel_poll_loop(
state,
adapter,
PollConfig {
platform: "signal",
idle_sleep: Duration::from_millis(300),
reply_on_error: false,
},
&SEMAPHORE,
)
.await;
}
pub async fn email_poll_loop(state: AppState) {
static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));
let adapter = match &state.email {
Some(a) => a.clone(),
None => return,
};
channel_poll_loop(
state,
adapter,
PollConfig {
platform: "email",
idle_sleep: Duration::from_secs(1),
reply_on_error: false,
},
&SEMAPHORE,
)
.await;
}