use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use regex::Regex;
use sea_orm::DatabaseConnection;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::llm::LlmProvider;
use crate::AppState;
#[derive(Debug, Clone)]
pub struct ProcessEvent {
pub id: Uuid,
pub kind: ProcessEventKind,
pub content: String,
}
#[derive(Debug, Clone)]
pub enum ProcessEventKind {
Started,
Progress,
Completed,
Failed,
}
impl ProcessEventKind {
pub fn as_str(&self) -> &'static str {
match self {
ProcessEventKind::Started => "started",
ProcessEventKind::Progress => "progress",
ProcessEventKind::Completed => "completed",
ProcessEventKind::Failed => "failed",
}
}
}
#[derive(Debug, Clone)]
pub struct MessageEvent {
pub id: Uuid,
pub kind: MessageEventKind,
pub content: String,
pub channel_id: String,
pub user_id: String,
pub thread_ts: Option<String>,
pub conversation_id: Option<i32>,
}
#[derive(Debug, Clone)]
pub enum MessageEventKind {
Received,
Delivered,
}
impl ProcessEvent {
pub fn started(content: impl Into<String>) -> Self {
let content = content.into();
debug!(kind = "started", preview = %&content[..content.len().min(80)], "ProcessEvent::started");
Self {
id: Uuid::new_v4(),
kind: ProcessEventKind::Started,
content,
}
}
pub fn progress(content: impl Into<String>) -> Self {
let content = content.into();
debug!(kind = "progress", preview = %&content[..content.len().min(80)], "ProcessEvent::progress");
Self {
id: Uuid::new_v4(),
kind: ProcessEventKind::Progress,
content,
}
}
pub fn completed(content: impl Into<String>) -> Self {
let content = content.into();
debug!(kind = "completed", preview = %&content[..content.len().min(80)], "ProcessEvent::completed");
Self {
id: Uuid::new_v4(),
kind: ProcessEventKind::Completed,
content,
}
}
pub fn failed(content: impl Into<String>) -> Self {
let content = content.into();
debug!(kind = "failed", preview = %&content[..content.len().min(80)], "ProcessEvent::failed");
Self {
id: Uuid::new_v4(),
kind: ProcessEventKind::Failed,
content,
}
}
}
impl MessageEvent {
pub fn received(
content: impl Into<String>,
channel_id: impl Into<String>,
user_id: impl Into<String>,
) -> Self {
Self {
id: Uuid::new_v4(),
kind: MessageEventKind::Received,
content: content.into(),
channel_id: channel_id.into(),
user_id: user_id.into(),
thread_ts: None,
conversation_id: None,
}
}
pub fn with_thread_ts(mut self, thread_ts: impl Into<String>) -> Self {
self.thread_ts = Some(thread_ts.into());
self
}
pub fn with_channel(mut self, channel_id: impl Into<String>) -> Self {
self.channel_id = channel_id.into();
self
}
pub fn with_conversation_id(mut self, id: i32) -> Self {
self.conversation_id = Some(id);
self
}
pub fn delivered(content: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
kind: MessageEventKind::Delivered,
content: content.into(),
channel_id: String::new(),
user_id: String::new(),
thread_ts: None,
conversation_id: None,
}
}
}
#[async_trait]
pub trait AgentGateway: Send + Sync {
type Config: Send + Sync;
fn config(&self) -> &Self::Config;
fn state(&self) -> Option<&AppState>;
fn llm(&self) -> Option<&dyn LlmProvider>;
async fn on_message(&self, event: MessageEvent) -> Result<()> {
use crate::entities::conversation::{ConversationRole, Model as Conversation};
if let Some(state) = self.state() {
let thread = event.thread_ts.as_deref().unwrap_or("");
Conversation::insert(
&state.db,
None, &event.channel_id,
thread,
&event.user_id,
ConversationRole::User,
&event.content,
)
.await?;
}
Ok(())
}
}
#[async_trait]
pub trait Worker: Send + Sync {
fn name(&self) -> &'static str;
fn description(&self) -> &'static str;
fn example(&self) -> &'static str;
async fn handle(
&self,
db: DatabaseConnection,
msg: MessageEvent,
args: HashMap<String, String>,
) -> Result<bool>;
}
pub fn parse_kv(content: &str) -> HashMap<String, String> {
let s = content.trim();
let s = if s.starts_with("<@") {
s.find('>').map(|i| s[i + 1..].trim_start()).unwrap_or(s)
} else {
s
};
let s = if s.starts_with('/') {
match s.split_once(|c: char| c.is_whitespace()) {
Some((_, rest)) => rest.trim(),
None => "",
}
} else {
s
};
let s = {
let trimmed = s.trim();
match trimmed.split_once(|c: char| c.is_whitespace()) {
Some((first, rest)) if !first.contains('=') => rest.trim(),
_ => trimmed,
}
};
parse_kv_pairs(s)
}
fn parse_kv_pairs(input: &str) -> HashMap<String, String> {
let mut map = HashMap::new();
let mut chars = input.chars().peekable();
loop {
while chars.peek().map(|c| c.is_whitespace()).unwrap_or(false) {
chars.next();
}
if chars.peek().is_none() {
break;
}
let mut key = String::new();
let mut has_eq = false;
loop {
match chars.peek() {
Some(&'=') => {
chars.next();
has_eq = true;
break;
}
Some(&c) if c.is_whitespace() => break,
Some(&c) => {
key.push(c);
chars.next();
}
None => break,
}
}
if key.is_empty() {
break;
}
if !has_eq {
continue;
}
let value = match chars.peek() {
Some(&'"') => {
chars.next();
let mut v = String::new();
loop {
match chars.next() {
Some('"') => break,
Some(c) => v.push(c),
None => break,
}
}
v
}
_ => {
let mut v = String::new();
while chars.peek().map(|c| !c.is_whitespace()).unwrap_or(false) {
v.push(chars.next().unwrap());
}
v
}
};
map.insert(key.to_lowercase(), value);
}
map
}
pub fn build_help_response(workers: &[WorkerRegistration]) -> String {
let mut out = String::from(":book: *Robson \u{2014} Available Commands*\n\n");
for reg in workers {
out.push_str(&format!(
"{} \u{2014} {}\n",
reg.worker.example(),
reg.worker.description()
));
}
out
}
pub struct WorkerRegistration {
pub pattern: Regex,
pub worker: Arc<dyn Worker>,
}
#[async_trait]
pub trait Gateway: Send + Sync {
fn name(&self) -> &'static str;
async fn send(&self, msg: MessageEvent) -> Result<()>;
async fn start(&self, db: DatabaseConnection) -> Result<()>;
}
pub struct SensoriumLoop {
workers: Vec<WorkerRegistration>,
gateways: Vec<Arc<dyn Gateway>>,
conversations_poll_interval_secs: u64,
process_event_deliveries_poll_interval_secs: u64,
help_re: Regex,
}
impl Default for SensoriumLoop {
fn default() -> Self {
Self::new()
}
}
impl SensoriumLoop {
pub fn new() -> Self {
info!("SensoriumLoop created");
Self {
workers: Vec::new(),
gateways: Vec::new(),
conversations_poll_interval_secs: 1,
process_event_deliveries_poll_interval_secs: 1,
help_re: Regex::new(r"(?i)(^<@\S+>\s*)?/help\b").expect("help regex is valid"),
}
}
pub fn with_conversations_poll_interval(mut self, secs: u64) -> Self {
info!(
conversations_poll_interval_secs = secs,
"SensoriumLoop conversations poll interval set"
);
self.conversations_poll_interval_secs = secs;
self
}
pub fn with_process_event_deliveries_poll_interval(mut self, secs: u64) -> Self {
info!(
process_event_deliveries_poll_interval_secs = secs,
"SensoriumLoop deliveries poll interval set"
);
self.process_event_deliveries_poll_interval_secs = secs;
self
}
pub fn register_worker(&mut self, pattern: &str, worker: Arc<dyn Worker>) -> Result<()> {
let compiled = Regex::new(pattern)
.map_err(|e| anyhow::anyhow!("invalid worker pattern {:?}: {}", pattern, e))?;
let help_probes = ["/help", "/HELP", "<@U1234> /help", "<@U1234> /HELP"];
if help_probes.iter().any(|s| compiled.is_match(s)) {
return Err(anyhow::anyhow!(
"worker pattern {:?} collides with the reserved /help command",
pattern
));
}
if let Some(existing) = self.workers.iter().find(|r| r.pattern.as_str() == pattern) {
return Err(anyhow::anyhow!(
"worker pattern {:?} already registered by worker {:?}",
pattern,
existing.worker.name()
));
}
debug!(
worker = worker.name(),
pattern,
total_workers = self.workers.len() + 1,
"Worker registered"
);
self.workers.push(WorkerRegistration {
pattern: compiled,
worker,
});
Ok(())
}
pub fn workers(&self) -> &[WorkerRegistration] {
&self.workers
}
pub fn register_gateway(&mut self, gateway: Arc<dyn Gateway>) {
let idx = self.gateways.len();
debug!(
gateway = gateway.name(),
gateway_index = idx,
total_gateways = idx + 1,
"Gateway registered"
);
self.gateways.push(gateway);
}
pub async fn run(self, db: DatabaseConnection) -> Result<()> {
use crate::entities::conversation::Model as Conversation;
use crate::entities::process_event::Model as ProcessEventModel;
use crate::entities::process_event_deliveries::Model as Delivery;
let workers = Arc::new(self.workers);
let gateways = Arc::new(self.gateways);
let help_re = Arc::new(self.help_re);
let conversations_poll_interval =
Duration::from_secs(self.conversations_poll_interval_secs);
let deliveries_poll_interval =
Duration::from_secs(self.process_event_deliveries_poll_interval_secs);
info!(
worker_count = workers.len(),
gateway_count = gateways.len(),
conversations_poll_interval_secs = self.conversations_poll_interval_secs,
process_event_deliveries_poll_interval_secs =
self.process_event_deliveries_poll_interval_secs,
"SensoriumLoop starting"
);
for (idx, gateway) in gateways.iter().enumerate() {
let gateway = gateway.clone();
let db_clone = db.clone();
debug!(
gateway = gateway.name(),
gateway_index = idx,
"Spawning gateway listener"
);
tokio::spawn(async move {
if let Err(e) = gateway.start(db_clone).await {
error!(error = %e, gateway_index = idx, "gateway stopped with error");
}
});
}
info!(
gateway_count = gateways.len(),
"All gateways spawned, starting poll loops"
);
let gateways_delivery = gateways.clone();
let db_delivery = db.clone();
tokio::spawn(async move {
loop {
let undelivered_events = match ProcessEventModel::find_undelivered(&db_delivery)
.await
{
Ok(rows) => rows,
Err(e) => {
warn!(error = %e, "delivery poll: failed to query undelivered process_events");
tokio::time::sleep(deliveries_poll_interval).await;
continue;
}
};
for event in &undelivered_events {
let conv = match Conversation::find_by_id(&db_delivery, event.conversation_id)
.await
{
Ok(Some(c)) => c,
Ok(None) => {
debug!(
process_event_id = event.id,
conversation_id = event.conversation_id,
"delivery poll: conversation not found, skipping"
);
continue;
}
Err(e) => {
warn!(error = %e, process_event_id = event.id, "delivery poll: failed to load conversation");
continue;
}
};
let target_gateways: Vec<_> = match conv.gateway_id {
Some(gw_id) => {
use crate::entities::gateway::Model as GatewayModel;
match GatewayModel::find_by_id(&db_delivery, gw_id).await {
Ok(Some(gw_row)) => gateways_delivery
.iter()
.filter(|g| g.name() == gw_row.name)
.cloned()
.collect(),
_ => {
warn!(
process_event_id = event.id,
gateway_id = gw_id,
"delivery poll: gateway row not found, skipping"
);
continue;
}
}
}
None => gateways_delivery.iter().cloned().collect(),
};
for gateway in target_gateways.iter() {
let gateway_name = gateway.name();
let delivery_id =
match Delivery::upsert_pending(&db_delivery, event.id, gateway_name)
.await
{
Ok(id) => id,
Err(e) => {
warn!(
error = %e,
process_event_id = event.id,
gateway = gateway_name,
"delivery poll: failed to upsert delivery record"
);
continue;
}
};
let pending =
match Delivery::find_pending_for_gateway(&db_delivery, gateway_name)
.await
{
Ok(rows) => rows,
Err(e) => {
warn!(
error = %e,
gateway = gateway_name,
"delivery poll: failed to find pending deliveries"
);
continue;
}
};
let is_eligible = pending.iter().any(|r| r.id == delivery_id);
if !is_eligible {
debug!(
process_event_id = event.id,
gateway = gateway_name,
delivery_id,
"delivery poll: skipping — within backoff window"
);
continue;
}
let msg = MessageEvent::delivered(event.content.clone())
.with_channel(conv.gateway_channel_id.clone());
match gateway.send(msg).await {
Ok(_) => {
debug!(
process_event_id = event.id,
gateway = gateway_name,
"delivery poll: delivered successfully"
);
if let Err(e) =
Delivery::mark_delivered(&db_delivery, delivery_id).await
{
warn!(error = %e, delivery_id, "delivery poll: failed to mark delivery record");
}
match Delivery::count_pending_for_event(&db_delivery, event.id)
.await
{
Ok(0) => {
if let Err(e) = ProcessEventModel::mark_delivered(
&db_delivery,
event.id,
)
.await
{
warn!(
error = %e,
process_event_id = event.id,
"delivery poll: failed to mark process_event delivered"
);
} else {
info!(
process_event_id = event.id,
"delivery poll: all gateways delivered — process_event marked done"
);
}
}
Ok(pending_count) => {
debug!(
process_event_id = event.id,
pending_count,
"delivery poll: still waiting on other gateways"
);
}
Err(e) => {
warn!(
error = %e,
process_event_id = event.id,
"delivery poll: failed to count pending deliveries"
);
}
}
}
Err(e) => {
warn!(
error = %e,
process_event_id = event.id,
gateway = gateway_name,
"delivery poll: delivery failed, scheduling retry"
);
if let Err(re) = Delivery::record_failure(
&db_delivery,
delivery_id,
&format!("{:#}", e),
)
.await
{
warn!(error = %re, delivery_id, "delivery poll: failed to record failure");
}
}
}
}
}
tokio::time::sleep(deliveries_poll_interval).await;
}
});
loop {
let unprocessed = match Conversation::find_unprocessed(&db).await {
Ok(rows) => rows,
Err(e) => {
warn!(error = %e, "conversations poll: failed to query unprocessed conversations");
tokio::time::sleep(conversations_poll_interval).await;
continue;
}
};
if !unprocessed.is_empty() {
debug!(
count = unprocessed.len(),
"conversations poll: found unprocessed rows"
);
}
for row in unprocessed {
let conversation_id = row.id;
let msg = MessageEvent {
id: Uuid::new_v4(),
kind: MessageEventKind::Received,
content: row.content.clone(),
channel_id: row.gateway_channel_id.clone(),
user_id: row.user_id.clone(),
thread_ts: if row.thread_ts.is_empty() {
None
} else {
Some(row.thread_ts.clone())
},
conversation_id: Some(conversation_id),
};
info!(
conversation_id,
channel_id = %row.gateway_channel_id,
user_id = %row.user_id,
thread_ts = ?msg.thread_ts,
preview = %&row.content[..row.content.len().min(80)],
"Conversation picked up for dispatch"
);
if help_re.is_match(&msg.content) {
let response = build_help_response(&workers);
if let Some(conv_id) = msg.conversation_id {
let db_h = db.clone();
tokio::spawn(async move {
if let Err(e) = crate::entities::process_event::Model::insert(
&db_h,
conv_id,
ProcessEventKind::Completed.as_str(),
&response,
)
.await
{
warn!(error = %e, conversation_id = conv_id, "help: failed to persist process event");
}
});
}
if let Err(e) = Conversation::mark_processed(&db, conversation_id).await {
warn!(error = %e, conversation_id, "conversations poll: failed to mark conversation as processed");
}
continue;
}
if let Err(e) = Conversation::mark_processed(&db, conversation_id).await {
warn!(error = %e, conversation_id, "conversations poll: failed to mark conversation as processed");
continue;
}
let matched = workers.iter().find(|r| r.pattern.is_match(&msg.content));
match matched {
Some(registration) => {
let args = parse_kv(&msg.content);
let worker = registration.worker.clone();
let worker_name = worker.name();
debug!(
worker = worker_name,
conversation_id,
pattern = registration.pattern.as_str(),
"Dispatching to matched worker"
);
let db_w = db.clone();
let msg = msg.clone();
tokio::spawn(async move {
match worker.handle(db_w, msg, args).await {
Ok(_) => {
info!(worker = worker_name, conversation_id, "Worker completed")
}
Err(e) => {
error!(error = %e, worker = worker_name, conversation_id, "Worker failed")
}
}
});
}
None => {
warn!(conversation_id, preview = %&row.content[..row.content.len().min(80)], "No worker matched message, skipping");
}
}
}
tokio::time::sleep(conversations_poll_interval).await;
}
}
}
#[cfg(test)]
#[path = "plugin_tests.rs"]
mod plugin_tests;