use std::{
collections::{HashMap, HashSet},
sync::{Arc, OnceLock, RwLock},
time::Duration,
};
use anyhow::Result;
use futures::StreamExt as _;
use md5::{Digest, Md5};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{Notify, mpsc};
use tracing::{error, info, warn};
use rsclaw_agent::{AgentMessage, AgentRegistry, FileAttachment, ImageAttachment};
use rsclaw_channel::OutboundMessage;
use rsclaw_store::redb_store::RedbStore;
pub use rsclaw_types::{Priority, TaskStatus, QueuedFile, QueuedMessage, QueuedTask, default_max_turns, compute_hash};
#[derive(Debug)]
pub enum TaskOutcome {
Done,
Partial,
Stuck(String),
Error(String),
Structured(StructuredOutcome),
NeedsInput(String),
}
pub use rsclaw_types::{StructuredOutcome, Completion, Recommend};
pub use rsclaw_types::SkipEntry;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DispatchAction {
Complete,
Fail,
AutoContinue { prompt: String, slow: bool },
Spawn { tasks: Vec<String> },
}
pub fn decide_action(outcome: &TaskOutcome, turn: u32, max_turns: u32) -> DispatchAction {
let at_max = max_turns == 0 || turn >= max_turns;
match outcome {
TaskOutcome::Done => DispatchAction::Complete,
TaskOutcome::Structured(out) => match out.recommend {
Recommend::Ship | Recommend::NeedsHuman => DispatchAction::Complete,
Recommend::Abandon => DispatchAction::Fail,
Recommend::Retry => {
if at_max {
DispatchAction::Fail
} else {
DispatchAction::AutoContinue {
prompt: format!(
"[auto-continue turn {turn}] Retry the task — \
your previous attempt asked for a fresh retry. \
Try again, change something if you can."
),
slow: true, }
}
}
Recommend::Continue => {
if out.follow_up_tasks.is_empty() {
DispatchAction::Complete
} else {
DispatchAction::Spawn {
tasks: out.follow_up_tasks.clone(),
}
}
}
},
TaskOutcome::NeedsInput(_) => DispatchAction::Complete,
TaskOutcome::Partial | TaskOutcome::Stuck(_) | TaskOutcome::Error(_) => {
if at_max {
DispatchAction::Complete
} else {
DispatchAction::AutoContinue {
prompt: continuation_prompt(outcome, turn),
slow: matches!(outcome, TaskOutcome::Error(_)),
}
}
}
}
}
pub use rsclaw_types::{stage_pending_outcome, drain_pending_outcome};
pub use rsclaw_types::{TASK_DEFAULT_MAX_TURNS, TASK_DEFAULT_TTL_SECS};
fn parse_task_prefix(text: &mut String) -> (u32, u64) {
let normalized: String = text.replace(['\u{2014}', '\u{2013}', '\u{2012}', '\u{2015}'], "--");
let trimmed = normalized.trim();
if !trimmed.starts_with("/task ") && trimmed != "/task" {
*text = normalized;
return (0, TASK_DEFAULT_TTL_SECS);
}
let rest = trimmed.strip_prefix("/task").unwrap_or(trimmed).trim();
let mut max_turns = TASK_DEFAULT_MAX_TURNS;
let mut ttl_secs = TASK_DEFAULT_TTL_SECS;
let mut msg_parts: Vec<&str> = Vec::new();
let mut iter = rest.split_whitespace().peekable();
while let Some(tok) = iter.next() {
match tok {
"--turns" | "-n" => {
if let Some(val) = iter.peek().and_then(|v| v.parse::<u32>().ok()) {
max_turns = val;
iter.next();
continue;
}
msg_parts.push(tok);
}
"--timeout" | "-t" => {
if let Some(val) = iter.peek().and_then(|v| parse_duration_str(v)) {
ttl_secs = val;
iter.next();
continue;
}
msg_parts.push(tok);
}
_ => msg_parts.push(tok),
}
}
*text = msg_parts.join(" ");
(max_turns, ttl_secs)
}
fn parse_duration_str(s: &str) -> Option<u64> {
let mut total: u64 = 0;
let mut num_buf = String::new();
for c in s.chars() {
if c.is_ascii_digit() {
num_buf.push(c);
} else {
let n: u64 = num_buf.parse().ok()?;
num_buf.clear();
match c {
'h' | 'H' => total += n * 3600,
'm' | 'M' => total += n * 60,
's' | 'S' => total += n,
_ => return None,
}
}
}
if !num_buf.is_empty() {
total += num_buf.parse::<u64>().ok()?;
}
if total > 0 { Some(total) } else { None }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStats {
pub pending: usize,
pub running: usize,
pub done: usize,
pub failed: usize,
pub dead: usize,
}
pub struct TaskQueueManager {
store: Arc<RedbStore>,
notify: Notify,
}
type ChannelSendersMap = Arc<RwLock<HashMap<String, mpsc::Sender<OutboundMessage>>>>;
static CHANNEL_SENDERS: OnceLock<ChannelSendersMap> = OnceLock::new();
static TASK_QUEUE: OnceLock<Arc<TaskQueueManager>> = OnceLock::new();
static PLUGIN_BACKGROUND_KEYS: OnceLock<Arc<RwLock<HashSet<String>>>> = OnceLock::new();
fn plugin_background_keys() -> Arc<RwLock<HashSet<String>>> {
PLUGIN_BACKGROUND_KEYS
.get_or_init(|| Arc::new(RwLock::new(HashSet::new())))
.clone()
}
pub fn install_channel_senders(senders: ChannelSendersMap) {
if CHANNEL_SENDERS.set(senders).is_err() {
warn!("task_queue: channel senders already installed, ignoring duplicate install");
}
}
pub fn install_task_queue(manager: Arc<TaskQueueManager>) {
if TASK_QUEUE.set(manager).is_err() {
warn!("task_queue: manager already installed, ignoring duplicate install");
}
}
pub fn get_task_queue() -> Option<Arc<TaskQueueManager>> {
TASK_QUEUE.get().cloned()
}
pub fn push_outbound(
channel: &str,
account: Option<&str>,
msg: OutboundMessage,
) -> Result<(), String> {
let tx = lookup_channel_sender_for(channel, account)
.ok_or_else(|| format!("channel sender '{channel}' not registered"))?;
tx.try_send(msg)
.map_err(|e| format!("channel '{channel}' send failed: {e}"))
}
fn lookup_channel_sender_for(
name: &str,
account: Option<&str>,
) -> Option<mpsc::Sender<OutboundMessage>> {
let map = CHANNEL_SENDERS.get()?.read().ok()?;
if let Some(acct) = account.filter(|s| !s.is_empty()) {
let key = format!("{name}/{acct}");
if let Some(tx) = map.get(&key).cloned() {
return Some(tx);
}
}
map.get(name).cloned()
}
fn task_ack_text(task_id: &str, max_turns: u32, ttl_secs: u64, lang: &str) -> String {
let ttl_human = if ttl_secs >= 3600 && ttl_secs % 3600 == 0 {
format!("{}h", ttl_secs / 3600)
} else if ttl_secs >= 60 && ttl_secs % 60 == 0 {
format!("{}m", ttl_secs / 60)
} else {
format!("{ttl_secs}s")
};
if lang == "zh" {
format!(
"任务已收到,开始处理(最多 {max_turns} 轮,超时 {ttl_human})\nID: {task_id}\n中止: /abort"
)
} else {
format!(
"Task received, working on it (up to {max_turns} turns, timeout {ttl_human})\nID: {task_id}\nAbort: /abort"
)
}
}
fn send_task_ack(task: &QueuedTask, max_turns: u32, ttl_secs: u64) {
let Some(msg) = task.messages.first() else {
return;
};
let Some(tx) = lookup_channel_sender_for(&msg.channel, msg.account.as_deref()) else {
warn!(channel = %msg.channel, task_id = %task.id, "task_queue: channel sender not registered, ack dropped");
return;
};
let lang = rsclaw_i18n::default_lang();
let ack = OutboundMessage {
target_id: msg.chat_id.clone(),
is_group: msg.is_group,
text: task_ack_text(&task.id, max_turns, ttl_secs, lang),
reply_to: msg.reply_to.clone(),
images: vec![],
files: vec![],
channel: Some(msg.channel.clone()),
account: msg.account.clone(),
};
if let Err(e) = tx.try_send(ack) {
warn!(channel = %msg.channel, task_id = %task.id, error = %e, "task_queue: ack send failed");
}
}
impl TaskQueueManager {
pub fn new(store: Arc<RedbStore>) -> Self {
Self {
store,
notify: Notify::new(),
}
}
pub async fn notified(&self) {
self.notify.notified().await;
}
pub fn submit(
&self,
session_key: &str,
mut message: QueuedMessage,
priority: Priority,
) -> Result<(String, bool)> {
let (max_turns, ttl_secs) = parse_task_prefix(&mut message.text);
let hash = compute_hash(&message.text);
if self.store.has_duplicate(session_key, &hash)? {
tracing::info!(session_key, "task_queue: duplicate message dropped");
return Ok(("dedup".to_string(), false));
}
if self.store.merge_into_pending(session_key, &message)? {
tracing::info!(session_key, "task_queue: message merged into pending task");
self.notify.notify_one();
return Ok(("merged".to_string(), true));
}
let mut task = QueuedTask::new(session_key.to_string(), message, priority);
task.max_turns = max_turns;
task.ttl_secs = ttl_secs;
let id = task.id.clone();
self.store.enqueue_task(&task)?;
if max_turns > 0 {
tracing::info!(session_key, task_id = %id, max_turns, ttl_secs, "task_queue: task enqueued (task mode)");
send_task_ack(&task, max_turns, ttl_secs);
} else {
tracing::info!(session_key, task_id = %id, "task_queue: message enqueued");
}
self.notify.notify_one();
Ok((id, false))
}
pub fn submit_task(
&self,
session_key: &str,
message: QueuedMessage,
priority: Priority,
max_turns: u32,
ttl_secs: u64,
) -> Result<(String, bool)> {
let hash = compute_hash(&message.text);
if self.store.has_duplicate(session_key, &hash)? {
tracing::info!(session_key, "task_queue: duplicate task dropped");
return Ok(("dedup".to_string(), false));
}
if self.store.merge_into_pending(session_key, &message)? {
tracing::info!(session_key, "task_queue: message merged into pending task");
self.notify.notify_one();
return Ok(("merged".to_string(), true));
}
let mut task = QueuedTask::new(session_key.to_string(), message, priority);
task.max_turns = max_turns;
task.ttl_secs = ttl_secs;
let id = task.id.clone();
self.store.enqueue_task(&task)?;
tracing::info!(session_key, task_id = %id, max_turns, ttl_secs, "task_queue: task enqueued");
send_task_ack(&task, max_turns, ttl_secs);
self.notify.notify_one();
Ok((id, false))
}
pub fn next(&self) -> Result<Option<QueuedTask>> {
let cleaned = self.store.cleanup_expired_tasks()?;
if cleaned > 0 {
tracing::info!(count = cleaned, "task_queue: cleaned expired tasks");
}
self.store.dequeue_task()
}
pub fn complete(&self, task_id: &str) -> Result<()> {
self.store.update_task_status(task_id, TaskStatus::Done)
}
pub fn recover_orphan_tasks(&self) -> Result<usize> {
self.store.requeue_running_tasks()
}
pub fn mark_notified(&self, task_id: &str) -> Result<()> {
self.store.mark_task_notified(task_id)
}
pub fn record_last_reply(&self, task_id: &str, text: &str) -> Result<()> {
self.store.update_task_last_reply(task_id, text)
}
pub fn record_turn(&self, task_id: &str, turn: u32) -> Result<()> {
self.store.update_task_turn(task_id, turn)
}
pub fn is_idem_delivered(&self, key: &str) -> Result<bool> {
self.store.is_idem_delivered(key)
}
pub fn mark_idem_delivered(&self, key: &str) -> Result<()> {
self.store.mark_idem_delivered(key)
}
pub fn cleanup_idem_keys(&self, retention_secs: i64) -> Result<usize> {
self.store.cleanup_idem_keys(retention_secs)
}
pub fn list_pending_notifications(&self, session_key: &str) -> Result<Vec<QueuedTask>> {
let mut all = self.store.list_tasks(Some(TaskStatus::Done))?;
all.retain(|t| t.session_key == session_key && !t.notified);
Ok(all)
}
pub fn fail(&self, task_id: &str, _error: &str, max_retries: u32) -> Result<TaskStatus> {
self.store.fail_task(task_id, max_retries)
}
pub fn stats(&self) -> Result<QueueStats> {
let all = self.store.list_tasks(None)?;
Ok(QueueStats {
pending: all
.iter()
.filter(|t| t.status == TaskStatus::Pending)
.count(),
running: all
.iter()
.filter(|t| t.status == TaskStatus::Running)
.count(),
done: all.iter().filter(|t| t.status == TaskStatus::Done).count(),
failed: all
.iter()
.filter(|t| t.status == TaskStatus::Failed)
.count(),
dead: all.iter().filter(|t| t.status == TaskStatus::Dead).count(),
})
}
}
fn staging_dir() -> std::path::PathBuf {
rsclaw_config::loader::base_dir().join("var/data/queue/staging")
}
pub fn stage_file(filename: &str, data: &[u8], mime_type: &str) -> Result<QueuedFile> {
let dir = staging_dir();
std::fs::create_dir_all(&dir)?;
let safe_name = filename.replace(['/', '\\'], "_");
let staged = format!("{}_{}", uuid::Uuid::new_v4(), safe_name);
let path = dir.join(&staged);
std::fs::write(&path, data)?;
Ok(QueuedFile {
filename: filename.to_string(),
path: path.to_string_lossy().to_string(),
mime_type: mime_type.to_string(),
})
}
fn unstage_file(qf: &QueuedFile) -> FileAttachment {
let data = std::fs::read(&qf.path).unwrap_or_default();
FileAttachment {
filename: qf.filename.clone(),
data,
mime_type: qf.mime_type.clone(),
}
}
fn cleanup_staged_files(task: &QueuedTask) {
for msg in &task.messages {
for qf in &msg.files {
if let Err(e) = std::fs::remove_file(&qf.path) {
tracing::debug!(path = %qf.path, "staging cleanup: {e}");
}
}
}
}
pub fn submit_to_queue(
manager: &TaskQueueManager,
session_key: &str,
text: &str,
channel: &str,
peer_id: &str,
chat_id: &str,
is_group: bool,
priority: Priority,
) -> Result<(String, bool)> {
let message = QueuedMessage {
text: text.to_string(),
sender: peer_id.to_string(),
channel: channel.to_string(),
chat_id: chat_id.to_string(),
is_group,
reply_to: None,
timestamp: chrono::Utc::now().timestamp(),
images: vec![],
files: vec![],
account: None,
};
manager.submit(session_key, message, priority)
}
fn classify_outcome(reply: &rsclaw_agent::AgentReply) -> TaskOutcome {
let text = reply.text.trim();
if text.is_empty() && reply.images.is_empty() && reply.files.is_empty() {
return TaskOutcome::Stuck("empty reply".to_string());
}
let lower = text.to_lowercase();
for pat in [
"rate limit",
"rate_limit",
"quota exceeded",
"context length exceeded",
"context_length_exceeded",
"maximum context",
"too many tokens",
] {
if lower.contains(pat) {
return TaskOutcome::Error(pat.to_string());
}
}
for pat in [
"i can't",
"i cannot",
"i'm unable",
"i am unable",
"i don't know how",
"i'm not sure how",
"i need more information",
"please provide",
"could you clarify",
"i'm stuck",
] {
if lower.contains(pat) {
return TaskOutcome::Stuck(pat.to_string());
}
}
for pat in [
"无法完成",
"做不到",
"我没法",
"我不知道怎么",
"需要更多信息",
"请提供",
"请告诉我",
"卡住了",
"我不太确定",
] {
if text.contains(pat) {
return TaskOutcome::Stuck(pat.to_string());
}
}
for pat in [
"i'll continue",
"i will continue",
"next step",
"let me continue",
"continuing",
"in progress",
"working on",
"todo",
"to be continued",
"not yet complete",
"partially done",
] {
if lower.contains(pat) {
return TaskOutcome::Partial;
}
}
for pat in [
"继续",
"下一步",
"未完成",
"还需要",
"稍后",
"进行中",
"正在",
"待办",
"尚未完成",
"部分完成",
] {
if text.contains(pat) {
return TaskOutcome::Partial;
}
}
TaskOutcome::Done
}
fn continuation_prompt(outcome: &TaskOutcome, turn: u32) -> String {
match outcome {
TaskOutcome::Partial => {
format!(
"[auto-continue turn {turn}] Continue from where you left off. Complete the remaining work."
)
}
TaskOutcome::Stuck(reason) => {
format!(
"[auto-continue turn {turn}] Previous attempt got stuck ({reason}). \
Try a different approach. If truly impossible, explain why \
concisely and stop."
)
}
TaskOutcome::Error(err) => {
format!(
"[auto-continue turn {turn}] Previous attempt encountered an error: {err}. \
Retry or work around it."
)
}
TaskOutcome::Done => String::new(),
TaskOutcome::Structured(_) | TaskOutcome::NeedsInput(_) => String::new(),
}
}
pub struct TaskQueueWorker {
manager: Arc<TaskQueueManager>,
registry: Arc<AgentRegistry>,
channel_senders: Arc<std::sync::RwLock<HashMap<String, mpsc::Sender<OutboundMessage>>>>,
shutdown: super::shutdown::ShutdownCoordinator,
config: rsclaw_config::runtime::RuntimeConfig,
}
impl TaskQueueWorker {
pub fn new(
manager: Arc<TaskQueueManager>,
registry: Arc<AgentRegistry>,
channel_senders: Arc<std::sync::RwLock<HashMap<String, mpsc::Sender<OutboundMessage>>>>,
shutdown: super::shutdown::ShutdownCoordinator,
config: rsclaw_config::runtime::RuntimeConfig,
) -> Self {
Self {
manager,
registry,
channel_senders,
shutdown,
config,
}
}
fn channel_tx_for(
&self,
name: &str,
account: Option<&str>,
) -> Option<mpsc::Sender<OutboundMessage>> {
if let Some(acct) = account.filter(|s| !s.is_empty()) {
let key = format!("{name}/{acct}");
if let Some(tx) = self
.channel_senders
.read()
.ok()
.and_then(|map| map.get(&key).cloned())
{
return Some(tx);
}
}
self.channel_tx(name)
}
fn channel_tx(&self, name: &str) -> Option<mpsc::Sender<OutboundMessage>> {
self.channel_senders
.read()
.expect("channel_senders lock poisoned")
.get(name)
.cloned()
}
async fn notify_user_failure(
&self,
channel_name: &str,
account: Option<&str>,
target: &str,
is_group: bool,
reply_to: Option<String>,
turn: u32,
reason: &str,
) {
let Some(tx) = self.channel_tx_for(channel_name, account) else {
warn!(channel = %channel_name, "no channel sender registered, failure notice dropped");
return;
};
let text = rsclaw_i18n::t_fmt(
"task_notify_failure",
rsclaw_i18n::default_lang(),
&[("reason", reason)],
);
let out = OutboundMessage {
target_id: target.to_owned(),
is_group,
text,
reply_to: if turn == 1 { reply_to } else { None },
images: vec![],
files: vec![],
channel: Some(channel_name.to_owned()),
account: account.map(str::to_owned),
};
if let Err(e) = tx.send(out).await {
error!(channel = %channel_name, "failure notice send failed: {e}");
}
}
pub async fn run(self: Arc<Self>) {
info!("task queue worker started");
match self.manager.recover_orphan_tasks() {
Ok(0) => {}
Ok(n) => info!(
count = n,
"task queue worker: revived orphan Running tasks → Pending"
),
Err(e) => error!("task queue worker: orphan recovery failed: {e:#}"),
}
let mut idem_gc_counter: u32 = 0;
loop {
if self.shutdown.is_draining() {
info!("task queue worker: drain signaled, stopping dequeue");
break;
}
match self.manager.next() {
Ok(Some(task)) => {
let guard = self.shutdown.begin_work();
let worker = Arc::clone(&self);
tokio::spawn(async move {
worker.process_task(task).await;
drop(guard);
});
continue;
}
Ok(None) => {
tokio::select! {
() = self.manager.notified() => {}
() = tokio::time::sleep(Duration::from_secs(5)) => {}
}
}
Err(e) => {
error!("task queue worker: dequeue error: {e:#}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
idem_gc_counter = idem_gc_counter.wrapping_add(1);
if idem_gc_counter % 720 == 0 {
match self.manager.cleanup_idem_keys(24 * 3600) {
Ok(0) => {}
Ok(n) => info!(count = n, "task queue worker: cleaned old idem keys"),
Err(e) => warn!("task queue worker: idem cleanup failed: {e:#}"),
}
}
}
info!("task queue worker exited");
}
async fn process_task(&self, task: QueuedTask) {
let task_id = task.id.clone();
let session_key = task.session_key.clone();
let max_turns = task.max_turns;
let Some(first_msg) = task.messages.first() else {
error!(task_id = %task_id, "task queue worker: task has no messages, skipping");
return;
};
let channel_name = first_msg.channel.clone();
let account = first_msg.account.clone();
let peer_id = first_msg.sender.clone();
let chat_id = first_msg.chat_id.clone();
let is_group = first_msg.is_group;
let reply_to = first_msg.reply_to.clone();
info!(
task_id = %task_id,
session_key = %session_key,
channel = %channel_name,
messages = task.messages.len(),
max_turns,
"task queue worker: processing task"
);
let handle = match self.registry.route(&channel_name) {
Ok(h) => h,
Err(_) => match self.registry.default_agent() {
Ok(h) => h,
Err(e) => {
error!(task_id = %task_id, "task queue worker: no agent for channel {channel_name}: {e:#}");
if let Err(fe) =
self.manager
.fail(&task_id, &format!("{e:#}"), task.max_retries)
{
error!(task_id = %task_id, "task queue worker: fail() error: {fe:#}");
}
return;
}
},
};
let first_text = task.merged_text();
let first_images: Vec<ImageAttachment> = task
.messages
.iter()
.flat_map(|m| {
m.images.iter().map(|data| ImageAttachment {
data: data.clone(),
mime_type: "image/png".to_string(),
source_path: None,
})
})
.collect();
let first_files: Vec<FileAttachment> = task
.messages
.iter()
.flat_map(|m| m.files.iter().map(unstage_file))
.collect();
let target = if chat_id.is_empty() {
peer_id.clone()
} else {
chat_id.clone()
};
let mut turn: u32 = task.turns;
if turn > 0 {
info!(task_id = %task_id, resume_turn = turn, "task queue worker: resuming /task after recovery");
}
let mut next_text = first_text;
let mut next_images = first_images;
let mut next_files = first_files;
let mut last_send_ok = false;
loop {
turn += 1;
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: session_key.clone(),
text: next_text,
channel: channel_name.clone(),
peer_id: peer_id.clone(),
chat_id: chat_id.clone(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: next_images,
files: next_files,
account: account.clone(),
};
info!(task_id = %task_id, turn, "task queue worker: agent turn");
if handle.tx.send(msg).await.is_err() {
error!(task_id = %task_id, "task queue worker: agent channel closed");
if let Err(fe) =
self.manager
.fail(&task_id, "agent channel closed", task.max_retries)
{
error!(task_id = %task_id, "task queue worker: fail() error: {fe:#}");
}
break;
}
let reply = match tokio::time::timeout(Duration::from_secs(2700), reply_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => {
error!(task_id = %task_id, turn, "task queue worker: reply channel dropped");
self.notify_user_failure(
&channel_name,
account.as_deref(),
&target,
is_group,
reply_to.clone(),
turn,
"reply channel dropped",
)
.await;
match self
.manager
.fail(&task_id, "reply channel dropped", task.max_retries)
{
Ok(TaskStatus::Dead) => cleanup_staged_files(&task),
Err(fe) => error!(task_id = %task_id, "fail() error: {fe:#}"),
_ => {}
}
break;
}
Err(_) => {
error!(task_id = %task_id, turn, "task queue worker: reply timeout (2700s)");
self.notify_user_failure(
&channel_name,
account.as_deref(),
&target,
is_group,
reply_to.clone(),
turn,
"reply timeout (45m)",
)
.await;
match self
.manager
.fail(&task_id, "reply timeout", task.max_retries)
{
Ok(TaskStatus::Dead) => cleanup_staged_files(&task),
Err(fe) => error!(task_id = %task_id, "fail() error: {fe:#}"),
_ => {}
}
break;
}
};
let outcome = match drain_pending_outcome(&task.session_key) {
Some(structured) => {
info!(
task_id = %task_id,
completion = ?structured.completion,
recommend = ?structured.recommend,
"task queue worker: using agent-declared structured outcome"
);
TaskOutcome::Structured(structured)
}
None => classify_outcome(&reply),
};
let pending = reply.pending_analysis;
let had_reply_payload =
!reply.text.is_empty() || !reply.images.is_empty() || !reply.files.is_empty();
if !reply.text.is_empty() {
if let Err(e) = self.manager.record_last_reply(&task_id, &reply.text) {
tracing::warn!(task_id = %task_id, "record_last_reply failed: {e:#}");
}
}
if had_reply_payload {
let idem_key = format!("task:{task_id}:turn:{turn}");
let already_delivered = match self.manager.is_idem_delivered(&idem_key) {
Ok(v) => v,
Err(e) => {
warn!(task_id = %task_id, "is_idem_delivered failed: {e:#}");
false
}
};
if already_delivered {
info!(
task_id = %task_id, turn,
"task queue worker: turn reply already delivered, skipping channel send"
);
last_send_ok = true;
} else {
let out = OutboundMessage {
target_id: target.clone(),
is_group,
text: reply.text.clone(),
reply_to: if turn == 1 { reply_to.clone() } else { None },
images: reply.images.clone(),
files: reply.files.clone(),
channel: Some(channel_name.clone()),
account: account.clone(),
};
if let Some(tx) = self.channel_tx_for(&channel_name, account.as_deref()) {
match tx.send(out).await {
Ok(_) => {
last_send_ok = true;
if let Err(e) = self.manager.mark_idem_delivered(&idem_key) {
warn!(task_id = %task_id, "mark_idem_delivered failed: {e:#}");
}
}
Err(e) => {
last_send_ok = false;
error!(task_id = %task_id, "send reply failed: {e}");
}
}
} else {
last_send_ok = false;
tracing::warn!(
task_id = %task_id,
channel = %channel_name,
"no channel sender registered, reply dropped"
);
}
}
}
if let Some(analysis) = pending {
if let Some(tx) = self.channel_tx_for(&channel_name, account.as_deref()) {
crate::gateway::startup::handle_pending_analysis(
analysis,
Arc::clone(&handle),
&tx,
target.clone(),
is_group,
&self.config,
)
.await;
}
}
info!(task_id = %task_id, turn, outcome = ?outcome, "task queue worker: turn outcome");
if let Err(e) = self.manager.record_turn(&task_id, turn) {
tracing::warn!(task_id = %task_id, "record_turn failed: {e:#}");
}
let action = decide_action(&outcome, turn, max_turns);
info!(task_id = %task_id, turn, ?action, "task queue worker: action");
match action {
DispatchAction::Complete => {
if let Err(e) = self.manager.complete(&task_id) {
error!(task_id = %task_id, "complete() error: {e:#}");
}
if last_send_ok {
if let Err(e) = self.manager.mark_notified(&task_id) {
error!(task_id = %task_id, "mark_notified() error: {e:#}");
}
}
cleanup_staged_files(&task);
break;
}
DispatchAction::Fail => {
if let Err(e) = self.manager.fail(&task_id, "agent abandoned", 0) {
error!(task_id = %task_id, "fail() error: {e:#}");
}
if last_send_ok {
if let Err(e) = self.manager.mark_notified(&task_id) {
error!(task_id = %task_id, "mark_notified() error: {e:#}");
}
}
cleanup_staged_files(&task);
break;
}
DispatchAction::Spawn { tasks } => {
let base = task.messages.first().cloned();
let now = chrono::Utc::now().timestamp();
let spawned = tasks.len();
for follow_up in tasks {
let Some(ref base_msg) = base else {
warn!(task_id = %task_id, "spawn: no base message to inherit channel from");
break;
};
let msg = QueuedMessage {
text: follow_up,
sender: format!("{}:follow_up", base_msg.sender),
channel: base_msg.channel.clone(),
account: base_msg.account.clone(),
chat_id: base_msg.chat_id.clone(),
is_group: base_msg.is_group,
reply_to: None,
timestamp: now,
images: vec![],
files: vec![],
};
match self.manager.submit_task(
&task.session_key,
msg,
Priority::System,
task.max_turns,
task.ttl_secs,
) {
Ok((new_id, _)) => {
info!(parent = %task_id, child = %new_id, "spawn: follow-up enqueued");
}
Err(e) => {
warn!(parent = %task_id, "spawn: submit_task failed: {e:#}");
}
}
}
info!(task_id = %task_id, spawned, "spawn: parent task completing");
if let Err(e) = self.manager.complete(&task_id) {
error!(task_id = %task_id, "complete() error: {e:#}");
}
if last_send_ok {
if let Err(e) = self.manager.mark_notified(&task_id) {
error!(task_id = %task_id, "mark_notified() error: {e:#}");
}
}
cleanup_staged_files(&task);
break;
}
DispatchAction::AutoContinue { prompt, slow } => {
next_text = prompt;
next_images = vec![];
next_files = vec![];
if slow {
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_task_prefix_short_flags() {
let mut text = "/task -n 20 fix the login bug".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 20);
assert_eq!(ttl, TASK_DEFAULT_TTL_SECS);
assert_eq!(text, "fix the login bug");
}
#[test]
fn parse_task_prefix_short_flags_combined() {
let mut text = "/task -n 50 -t 4h refactor payments".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 50);
assert_eq!(ttl, 4 * 3600);
assert_eq!(text, "refactor payments");
}
#[test]
fn parse_task_prefix_long_flags_still_work() {
let mut text = "/task --turns 30 --timeout 2h work".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 30);
assert_eq!(ttl, 2 * 3600);
assert_eq!(text, "work");
}
#[test]
fn parse_task_prefix_em_dash_normalized() {
let mut text = "/task \u{2014}turns 25 \u{2014}timeout 30m do x".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 25);
assert_eq!(ttl, 30 * 60);
assert_eq!(text, "do x");
}
#[test]
fn parse_task_prefix_no_task_prefix_chat_mode() {
let mut text = "hello there".to_string();
let (turns, _ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 0);
}
#[test]
fn parse_task_prefix_n_without_value_kept_as_text() {
let mut text = "/task -n investigate logs".to_string();
let (turns, _ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, TASK_DEFAULT_MAX_TURNS);
assert_eq!(text, "-n investigate logs");
}
fn make_outcome(completion: Completion, recommend: Recommend) -> StructuredOutcome {
StructuredOutcome {
completion,
recommend,
verified: false,
verification_log: None,
accomplished: vec!["did the thing".into()],
skipped: vec![],
blocked_on: vec![],
assumptions: vec![],
follow_up_tasks: vec![],
summary: None,
}
}
#[test]
fn structured_outcome_serializes_snake_case() {
let mut out = make_outcome(Completion::Partial, Recommend::Continue);
out.follow_up_tasks = vec!["task A".into(), "task B".into()];
out.blocked_on = vec!["disk full".into()];
let json = serde_json::to_value(&out).expect("serialize");
assert_eq!(json["completion"], "partial");
assert_eq!(json["recommend"], "continue");
assert_eq!(json["follow_up_tasks"][0], "task A");
assert_eq!(json["blocked_on"][0], "disk full");
}
#[test]
fn pending_outcome_stash_roundtrip() {
let session = "test:stash:roundtrip";
assert!(drain_pending_outcome(session).is_none());
let outcome = make_outcome(Completion::Full, Recommend::Ship);
stage_pending_outcome(session, outcome);
let drained = drain_pending_outcome(session).expect("staged outcome");
assert_eq!(drained.completion, Completion::Full);
assert_eq!(drained.recommend, Recommend::Ship);
assert!(drain_pending_outcome(session).is_none());
}
#[test]
fn decide_action_done_completes() {
assert_eq!(
decide_action(&TaskOutcome::Done, 1, 10),
DispatchAction::Complete
);
}
#[test]
fn decide_action_structured_ship_completes() {
let outcome = TaskOutcome::Structured(make_outcome(Completion::Full, Recommend::Ship));
assert_eq!(decide_action(&outcome, 1, 10), DispatchAction::Complete);
}
#[test]
fn decide_action_structured_needs_human_completes() {
let outcome =
TaskOutcome::Structured(make_outcome(Completion::Partial, Recommend::NeedsHuman));
assert_eq!(decide_action(&outcome, 1, 10), DispatchAction::Complete);
}
#[test]
fn decide_action_structured_abandon_fails() {
let outcome = TaskOutcome::Structured(make_outcome(Completion::Failed, Recommend::Abandon));
assert_eq!(decide_action(&outcome, 1, 10), DispatchAction::Fail);
}
#[test]
fn decide_action_structured_retry_continues() {
let outcome = TaskOutcome::Structured(make_outcome(Completion::Minimal, Recommend::Retry));
match decide_action(&outcome, 2, 10) {
DispatchAction::AutoContinue { prompt, slow } => {
assert!(prompt.contains("Retry"));
assert!(slow, "retry should rate-limit");
}
other => panic!("expected AutoContinue, got {other:?}"),
}
}
#[test]
fn decide_action_structured_retry_at_max_fails() {
let outcome = TaskOutcome::Structured(make_outcome(Completion::Minimal, Recommend::Retry));
assert_eq!(decide_action(&outcome, 5, 5), DispatchAction::Fail);
}
#[test]
fn decide_action_structured_continue_with_followups_spawns() {
let mut out = make_outcome(Completion::Partial, Recommend::Continue);
out.follow_up_tasks = vec!["step 1".into(), "step 2".into()];
let outcome = TaskOutcome::Structured(out);
match decide_action(&outcome, 1, 10) {
DispatchAction::Spawn { tasks } => {
assert_eq!(tasks, vec!["step 1".to_string(), "step 2".to_string()]);
}
other => panic!("expected Spawn, got {other:?}"),
}
}
#[test]
fn decide_action_structured_continue_without_followups_completes() {
let outcome =
TaskOutcome::Structured(make_outcome(Completion::Partial, Recommend::Continue));
assert_eq!(decide_action(&outcome, 1, 10), DispatchAction::Complete);
}
#[test]
fn decide_action_needs_input_completes() {
let outcome = TaskOutcome::NeedsInput("which file?".into());
assert_eq!(decide_action(&outcome, 1, 10), DispatchAction::Complete);
}
#[test]
fn decide_action_partial_continues_under_budget() {
match decide_action(&TaskOutcome::Partial, 2, 10) {
DispatchAction::AutoContinue { prompt, slow } => {
assert!(prompt.contains("Continue"));
assert!(!slow, "partial should not rate-limit");
}
other => panic!("expected AutoContinue, got {other:?}"),
}
}
#[test]
fn decide_action_partial_at_max_completes() {
assert_eq!(
decide_action(&TaskOutcome::Partial, 5, 5),
DispatchAction::Complete
);
}
#[test]
fn decide_action_error_slow_retry() {
match decide_action(&TaskOutcome::Error("rate limit".into()), 1, 10) {
DispatchAction::AutoContinue { slow, .. } => {
assert!(slow, "Error should rate-limit before retry");
}
other => panic!("expected AutoContinue, got {other:?}"),
}
}
fn fake_reply(text: &str) -> rsclaw_agent::AgentReply {
rsclaw_agent::AgentReply {
text: text.to_string(),
is_empty: text.is_empty(),
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
outcome: rsclaw_agent::registry::ReplyOutcome::Ok,
}
}
#[test]
fn classify_chinese_stuck_phrase() {
let reply = fake_reply("抱歉,我无法完成这个任务");
assert!(matches!(classify_outcome(&reply), TaskOutcome::Stuck(_)));
}
#[test]
fn classify_chinese_partial_phrase() {
let reply = fake_reply("先做了一半,下一步来处理剩下的");
assert!(matches!(classify_outcome(&reply), TaskOutcome::Partial));
}
#[test]
fn classify_empty_reply_is_stuck() {
assert!(matches!(
classify_outcome(&fake_reply("")),
TaskOutcome::Stuck(_)
));
}
#[test]
fn classify_plain_reply_is_done() {
let reply = fake_reply("Sure, here's the result: 42.");
assert!(matches!(classify_outcome(&reply), TaskOutcome::Done));
}
}
pub struct GatewayBriefingSink;
impl rsclaw_types::BriefingSink for GatewayBriefingSink {
fn submit_briefing(
&self,
session_key: &str,
text: &str,
channel: &str,
peer_id: &str,
chat_id: &str,
is_group: bool,
priority: Priority,
) -> anyhow::Result<(String, bool)> {
let tq = get_task_queue()
.ok_or_else(|| anyhow::anyhow!("task queue not installed"))?;
submit_to_queue(&tq, session_key, text, channel, peer_id, chat_id, is_group, priority)
}
fn push_outbound(
&self,
channel: &str,
account: Option<&str>,
msg: OutboundMessage,
) -> Result<(), String> {
push_outbound(channel, account, msg)
}
}
pub struct GatewayPluginBackgroundHost;
impl rsclaw_plugin::PluginBackgroundHost for GatewayPluginBackgroundHost {
fn cron_register(
&self,
plugin: String,
name: String,
schedule_json: String,
ctx: Option<rsclaw_plugin::PluginInvocationContext>,
) -> futures::future::BoxFuture<'static, std::result::Result<String, String>> {
Box::pin(async move {
let key = plugin_background_key("cron", &plugin, &name, None, ctx.as_ref());
if !claim_plugin_background_key(&key) {
return Ok("already_registered".to_owned());
}
let schedule: Value = serde_json::from_str(&schedule_json)
.map_err(|e| format!("plugin cron schedule JSON invalid: {e}"))?;
let slots = schedule
.get("slots")
.and_then(Value::as_object)
.ok_or_else(|| "plugin cron schedule requires object field `slots`".to_owned())?;
let prompt_template = schedule
.get("promptTemplate")
.and_then(Value::as_str)
.unwrap_or("Run plugin cron job {plugin}.{name} slot {slot}.")
.to_owned();
let session_key = schedule
.get("sessionKey")
.and_then(Value::as_str)
.map(str::to_owned);
let mut count = 0usize;
for (slot, hhmm) in slots {
let Some(hhmm) = hhmm.as_str() else {
continue;
};
let Some((hour, minute)) = parse_hhmm(hhmm) else {
continue;
};
let ctx_for_task = ctx.clone();
let prompt_template = prompt_template.clone();
let plugin = plugin.clone();
let name = name.clone();
let slot = slot.clone();
let session_key = session_key.clone();
tokio::spawn(async move {
loop {
let wait = duration_until_shanghai(hour, minute);
tokio::time::sleep(wait).await;
let prompt = prompt_template
.replace("{plugin}", &plugin)
.replace("{name}", &name)
.replace("{slot}", &slot);
let session = session_key
.clone()
.or_else(|| ctx_for_task.as_ref().map(|c| c.session_key.clone()))
.unwrap_or_else(|| format!("plugin:{plugin}:{name}:{slot}"));
if let Err(e) = submit_plugin_agent_turn(&session, &prompt, "{}", ctx_for_task.as_ref()) {
warn!(plugin, name, slot, error = %e, "plugin cron submit failed");
}
}
});
count += 1;
}
Ok(format!("registered {count} cron slot(s)"))
})
}
fn sse_subscribe(
&self,
plugin: String,
name: String,
url: String,
headers_json: String,
resume_key: String,
ctx: Option<rsclaw_plugin::PluginInvocationContext>,
) -> futures::future::BoxFuture<'static, std::result::Result<String, String>> {
Box::pin(async move {
let key = plugin_background_key("sse", &plugin, &name, Some(&url), ctx.as_ref());
if !claim_plugin_background_key(&key) {
return Ok("already_registered".to_owned());
}
let Some(ctx) = ctx else {
return Err("plugin SSE subscribe requires invocation context".to_owned());
};
tokio::spawn(async move {
run_plugin_sse(plugin, name, url, headers_json, resume_key, ctx).await;
});
Ok("registered".to_owned())
})
}
fn sse_status(
&self,
plugin: String,
name: String,
ctx: Option<rsclaw_plugin::PluginInvocationContext>,
) -> futures::future::BoxFuture<'static, std::result::Result<String, String>> {
Box::pin(async move { Ok(plugin_sse_status_json(&plugin, &name, ctx.as_ref())) })
}
fn push_outbound(
&self,
channel: String,
peer_id: String,
message_json: String,
ctx: Option<rsclaw_plugin::PluginInvocationContext>,
) -> futures::future::BoxFuture<'static, std::result::Result<String, String>> {
Box::pin(async move { push_plugin_outbound(&channel, &peer_id, &message_json, ctx.as_ref()) })
}
fn submit_agent_turn(
&self,
session_key: String,
prompt: String,
route_json: String,
ctx: Option<rsclaw_plugin::PluginInvocationContext>,
) -> futures::future::BoxFuture<'static, std::result::Result<String, String>> {
Box::pin(async move {
let session = if session_key.trim().is_empty() {
ctx.as_ref()
.map(|c| c.session_key.as_str())
.unwrap_or("plugin:agent-turn")
} else {
session_key.as_str()
};
submit_plugin_agent_turn(session, &prompt, &route_json, ctx.as_ref())
})
}
}
fn claim_plugin_background_key(key: &str) -> bool {
let keys = plugin_background_keys();
let Ok(mut guard) = keys.write() else {
return false;
};
guard.insert(key.to_owned())
}
fn plugin_background_key_prefix(
kind: &str,
plugin: &str,
name: &str,
ctx: Option<&rsclaw_plugin::PluginInvocationContext>,
) -> String {
let ctx = ctx
.map(plugin_invocation_context_key)
.unwrap_or_else(|| "global".to_owned());
format!("{kind}:{plugin}:{name}:{ctx}")
}
fn plugin_background_key(
kind: &str,
plugin: &str,
name: &str,
extra: Option<&str>,
ctx: Option<&rsclaw_plugin::PluginInvocationContext>,
) -> String {
let ctx = ctx
.map(plugin_invocation_context_key)
.unwrap_or_else(|| "global".to_owned());
match extra {
Some(extra) if !extra.is_empty() => format!("{kind}:{plugin}:{name}:{ctx}:{extra}"),
_ => format!("{kind}:{plugin}:{name}:{ctx}"),
}
}
fn plugin_invocation_context_key(ctx: &rsclaw_plugin::PluginInvocationContext) -> String {
format!(
"agent={}:channel={}:peer={}:chat={}:session={}",
ctx.agent_id, ctx.channel, ctx.peer_id, ctx.chat_id, ctx.session_key
)
}
fn plugin_sse_status_json(
plugin: &str,
name: &str,
ctx: Option<&rsclaw_plugin::PluginInvocationContext>,
) -> String {
let prefix = plugin_background_key_prefix("sse", plugin, name, ctx);
let count = plugin_background_keys()
.read()
.map(|keys| {
keys.iter()
.filter(|key| *key == &prefix || key.starts_with(&format!("{prefix}:")))
.count()
})
.unwrap_or(0);
serde_json::json!({
"ok": true,
"name": name,
"active": count > 0,
"count": count,
})
.to_string()
}
fn parse_hhmm(raw: &str) -> Option<(u32, u32)> {
let (h, m) = raw.split_once(':')?;
let hour = h.parse::<u32>().ok()?;
let minute = m.parse::<u32>().ok()?;
if hour < 24 && minute < 60 {
Some((hour, minute))
} else {
None
}
}
fn duration_until_shanghai(hour: u32, minute: u32) -> Duration {
use chrono::{Datelike, TimeZone};
let tz = chrono_tz::Asia::Shanghai;
let now = chrono::Utc::now().with_timezone(&tz);
let today = tz
.with_ymd_and_hms(now.year(), now.month(), now.day(), hour, minute, 0)
.single()
.unwrap_or(now);
let next = if today > now {
today
} else {
today + chrono::Duration::days(1)
};
(next - now).to_std().unwrap_or(Duration::from_secs(60))
}
fn submit_plugin_agent_turn(
session_key: &str,
prompt: &str,
route_json: &str,
ctx: Option<&rsclaw_plugin::PluginInvocationContext>,
) -> Result<String, String> {
let route: Value = serde_json::from_str(route_json).unwrap_or(Value::Null);
let route_ctx = route.get("context").unwrap_or(&Value::Null);
let channel = route
.get("channel")
.and_then(Value::as_str)
.or_else(|| route_ctx.get("channel").and_then(Value::as_str))
.or_else(|| ctx.map(|c| c.channel.as_str()))
.unwrap_or("plugin");
let peer_id = route
.get("peer_id")
.and_then(Value::as_str)
.or_else(|| route_ctx.get("peer_id").and_then(Value::as_str))
.or_else(|| ctx.map(|c| c.peer_id.as_str()))
.unwrap_or("plugin");
let chat_id = route
.get("chat_id")
.and_then(Value::as_str)
.or_else(|| route_ctx.get("chat_id").and_then(Value::as_str))
.or_else(|| ctx.map(|c| c.chat_id.as_str()))
.unwrap_or(peer_id);
let is_group = route
.get("is_group")
.and_then(Value::as_bool)
.or_else(|| route_ctx.get("is_group").and_then(Value::as_bool))
.or_else(|| ctx.map(|c| c.is_group))
.unwrap_or(false);
let tq = get_task_queue().ok_or_else(|| "task queue not installed".to_owned())?;
let (task_id, merged) = submit_to_queue(
&tq,
session_key,
prompt,
channel,
peer_id,
chat_id,
is_group,
Priority::Cron,
)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({ "taskId": task_id, "merged": merged }).to_string())
}
fn push_plugin_outbound(
channel: &str,
peer_id: &str,
message_json: &str,
ctx: Option<&rsclaw_plugin::PluginInvocationContext>,
) -> Result<String, String> {
let message: Value = serde_json::from_str(message_json)
.map_err(|e| format!("plugin outbound message JSON invalid: {e}"))?;
let text = message
.get("text")
.and_then(Value::as_str)
.unwrap_or("")
.to_owned();
let images = json_array_strings(&message, "images");
let files = json_array_files(&message, "files");
let account = message
.get("account")
.and_then(Value::as_str)
.map(str::to_owned);
let target_id = if peer_id.is_empty() {
ctx.map(|c| c.peer_id.clone()).unwrap_or_default()
} else {
peer_id.to_owned()
};
let batch_targets = json_array_strings(&message, "batch_targets");
let target_id = if batch_targets.is_empty() {
target_id
} else {
format!(
"{}{}",
rsclaw_types::OUTBOUND_BATCH_PREFIX,
batch_targets.join(",")
)
};
let msg = OutboundMessage {
target_id,
is_group: message
.get("is_group")
.and_then(Value::as_bool)
.or_else(|| ctx.map(|c| c.is_group))
.unwrap_or(false),
text,
reply_to: None,
images,
files,
channel: Some(channel.to_owned()),
account,
};
let account = msg.account.clone();
push_outbound(channel, account.as_deref(), msg)?;
Ok("dispatched".to_owned())
}
fn json_array_strings(value: &Value, key: &str) -> Vec<String> {
value
.get(key)
.and_then(Value::as_array)
.map(|arr| {
arr.iter()
.filter_map(Value::as_str)
.map(str::to_owned)
.collect()
})
.unwrap_or_default()
}
fn json_array_files(value: &Value, key: &str) -> Vec<(String, String, String)> {
value
.get(key)
.and_then(Value::as_array)
.map(|arr| arr.iter().filter_map(json_file_tuple).collect())
.unwrap_or_default()
}
fn json_file_tuple(value: &Value) -> Option<(String, String, String)> {
let (path, filename, mime) = if let Some(path) = value.as_str() {
let filename = std::path::Path::new(path)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("plugin-file")
.to_owned();
(path.to_owned(), filename, "application/octet-stream".to_owned())
} else {
let obj = value.as_object()?;
let path = obj.get("path").and_then(Value::as_str)?.to_owned();
let filename = obj
.get("filename")
.and_then(Value::as_str)
.map(str::to_owned)
.unwrap_or_else(|| {
std::path::Path::new(&path)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("plugin-file")
.to_owned()
});
let mime = obj
.get("mime")
.or_else(|| obj.get("mimeType"))
.and_then(Value::as_str)
.unwrap_or("application/octet-stream")
.to_owned();
(path, filename, mime)
};
Some((filename, mime, path))
}
async fn run_plugin_sse(
plugin: String,
name: String,
url: String,
headers_json: String,
resume_key: String,
ctx: rsclaw_plugin::PluginInvocationContext,
) {
let client = match reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.build()
{
Ok(client) => client,
Err(e) => {
warn!(plugin, name, error = %e, "plugin SSE client build failed");
return;
}
};
let headers: Value = serde_json::from_str(&headers_json).unwrap_or_else(|_| Value::Null);
let mut backoff = Duration::from_secs(1);
loop {
let mut req = client.get(&url).header("Accept", "text/event-stream");
if let Some(obj) = headers.as_object() {
for (k, v) in obj {
if let Some(s) = v.as_str() {
req = req.header(k, s);
}
}
}
match req.send().await {
Ok(resp) => match resp.error_for_status() {
Ok(resp) => {
backoff = Duration::from_secs(1);
let mut buf = String::new();
let mut event_name = String::new();
let mut data_lines: Vec<String> = Vec::new();
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
buf.push_str(&String::from_utf8_lossy(&bytes));
while let Some(nl) = buf.find('\n') {
let line = buf[..nl].trim_end_matches('\r').to_owned();
buf.drain(..=nl);
if line.is_empty() {
if !data_lines.is_empty() {
let data = data_lines.join("\n");
let text = format_plugin_sse_text(&plugin, &name, &event_name, &data);
let _ = push_plugin_outbound(
&ctx.channel,
&ctx.peer_id,
&serde_json::json!({ "text": text }).to_string(),
Some(&ctx),
);
}
event_name.clear();
data_lines.clear();
} else if let Some(rest) = line.strip_prefix("event: ") {
event_name = rest.trim().to_owned();
} else if let Some(rest) = line.strip_prefix("data: ") {
data_lines.push(rest.to_owned());
}
}
}
Err(e) => {
warn!(plugin, name, resume_key, error = %e, "plugin SSE read failed");
break;
}
}
}
}
Err(e) => warn!(plugin, name, error = %e, "plugin SSE HTTP status error"),
},
Err(e) => warn!(plugin, name, error = %e, "plugin SSE connect failed"),
}
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(60));
}
}
fn format_plugin_sse_text(plugin: &str, name: &str, event_name: &str, data: &str) -> String {
let label = if event_name.is_empty() { "event" } else { event_name };
if let Ok(v) = serde_json::from_str::<Value>(data) {
if let Some(code) = v.get("code").and_then(Value::as_str) {
let stock_name = v.get("name").and_then(Value::as_str).unwrap_or("");
let filter = v.get("filter").and_then(Value::as_str).unwrap_or(label);
return format!("[{plugin}/{name}] {filter}: {code} {stock_name}");
}
}
format!("[{plugin}/{name}] {label}: {data}")
}
pub struct GatewayTaskQueueHost;
impl rsclaw_types::TaskQueueHost for GatewayTaskQueueHost {
fn submit_task(
&self,
session_key: &str,
message: QueuedMessage,
priority: Priority,
max_turns: u32,
ttl_secs: u64,
) -> anyhow::Result<(String, bool)> {
let manager = get_task_queue()
.ok_or_else(|| anyhow::anyhow!("task queue not available (gateway not started?)"))?;
manager.submit_task(session_key, message, priority, max_turns, ttl_secs)
}
}
#[cfg(test)]
mod plugin_background_tests {
use super::*;
fn ctx(peer: &str) -> rsclaw_plugin::PluginInvocationContext {
rsclaw_plugin::PluginInvocationContext {
target_id: peer.to_owned(),
channel: "test".to_owned(),
agent_id: "main".to_owned(),
peer_id: peer.to_owned(),
chat_id: peer.to_owned(),
session_key: format!("agent:main:test:direct:{peer}"),
is_group: false,
}
}
#[test]
fn plugin_background_keys_are_peer_scoped() {
let a = ctx("peer-a");
let b = ctx("peer-b");
let key_a = plugin_background_key("cron", "market", "market.briefing", None, Some(&a));
let key_a_again = plugin_background_key("cron", "market", "market.briefing", None, Some(&a));
let key_b = plugin_background_key("cron", "market", "market.briefing", None, Some(&b));
assert_eq!(key_a, key_a_again);
assert_ne!(key_a, key_b);
}
#[test]
fn plugin_background_sse_keys_include_url_and_peer() {
let a = ctx("peer-a");
let b = ctx("peer-b");
let url = "https://plugin.example/v1/stream?filter=alpha";
let key_a = plugin_background_key("sse", "market", "market.alpha", Some(url), Some(&a));
let key_b = plugin_background_key("sse", "market", "market.alpha", Some(url), Some(&b));
let key_other_url = plugin_background_key(
"sse",
"market",
"market.alpha",
Some("https://plugin.example/v1/stream?filter=beta"),
Some(&a),
);
assert_ne!(key_a, key_b);
assert_ne!(key_a, key_other_url);
}
#[test]
fn plugin_sse_status_is_context_scoped() {
let a = ctx("peer-status-a");
let b = ctx("peer-status-b");
let url = "https://plugin.example/v1/stream?filter=alpha";
let status_name = "market.alpha.status_test";
let before: Value =
serde_json::from_str(&plugin_sse_status_json("market", status_name, Some(&a)))
.expect("status JSON before");
assert_eq!(before["active"].as_bool(), Some(false));
assert_eq!(before["count"].as_u64(), Some(0));
let key = plugin_background_key("sse", "market", status_name, Some(url), Some(&a));
assert!(claim_plugin_background_key(&key));
let active: Value =
serde_json::from_str(&plugin_sse_status_json("market", status_name, Some(&a)))
.expect("status JSON active");
assert_eq!(active["active"].as_bool(), Some(true));
assert_eq!(active["count"].as_u64(), Some(1));
let other_peer: Value =
serde_json::from_str(&plugin_sse_status_json("market", status_name, Some(&b)))
.expect("status JSON other peer");
assert_eq!(other_peer["active"].as_bool(), Some(false));
assert_eq!(other_peer["count"].as_u64(), Some(0));
}
}