use std::{
collections::HashMap,
sync::{Arc, OnceLock, RwLock},
time::Duration,
};
use anyhow::Result;
use md5::{Digest, Md5};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Notify};
use tracing::{error, info, warn};
use crate::{
agent::{AgentMessage, AgentRegistry, FileAttachment, ImageAttachment},
channel::OutboundMessage,
store::redb_store::RedbStore,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[repr(u8)]
pub enum Priority {
System = 0,
Cron = 1,
User = 2,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskStatus {
Pending,
Running,
Done,
Failed,
Dead,
}
#[derive(Debug)]
pub enum TaskOutcome {
Done,
Partial,
Stuck(String),
Error(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuedFile {
pub filename: String,
pub path: String,
pub mime_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuedMessage {
pub text: String,
pub sender: String,
pub channel: String,
#[serde(default)]
pub account: Option<String>,
pub chat_id: String,
pub is_group: bool,
#[serde(default)]
pub reply_to: Option<String>,
pub timestamp: i64,
pub images: Vec<String>,
pub files: Vec<QueuedFile>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuedTask {
pub id: String,
pub session_key: String,
pub messages: Vec<QueuedMessage>,
pub priority: Priority,
pub status: TaskStatus,
pub retries: u32,
pub max_retries: u32,
#[serde(default)]
pub turns: u32,
#[serde(default = "default_max_turns")]
pub max_turns: u32,
pub created_at: i64,
pub updated_at: i64,
pub ttl_secs: u64,
pub content_hash: String,
pub error: Option<String>,
#[serde(default)]
pub notified: bool,
#[serde(default)]
pub last_reply: Option<String>,
}
impl QueuedTask {
pub fn new(session_key: String, message: QueuedMessage, priority: Priority) -> Self {
let now = chrono::Utc::now().timestamp();
let hash = compute_hash(&message.text);
Self {
id: uuid::Uuid::new_v4().to_string(),
session_key,
messages: vec![message],
priority,
status: TaskStatus::Pending,
retries: 0,
max_retries: 3,
turns: 0,
max_turns: default_max_turns(),
created_at: now,
updated_at: now,
ttl_secs: 3600,
content_hash: hash,
error: None,
notified: false,
last_reply: None,
}
}
pub fn is_expired(&self) -> bool {
if self.ttl_secs == 0 {
return false;
}
let now = chrono::Utc::now().timestamp();
now - self.created_at > self.ttl_secs as i64
}
pub fn merged_text(&self) -> String {
if self.messages.len() == 1 {
return self.messages[0].text.clone();
}
self.messages
.iter()
.map(|m| m.text.as_str())
.collect::<Vec<_>>()
.join("\n\n---\n\n")
}
}
fn default_max_turns() -> u32 {
0
}
pub const TASK_DEFAULT_MAX_TURNS: u32 = 10;
pub const TASK_DEFAULT_TTL_SECS: u64 = 3600;
fn parse_task_prefix(text: &mut String) -> (u32, u64) {
let normalized: String = text.replace(
['\u{2014}', '\u{2013}', '\u{2012}', '\u{2015}'],
"--",
);
let trimmed = normalized.trim();
if !trimmed.starts_with("/task ") && trimmed != "/task" {
*text = normalized;
return (0, TASK_DEFAULT_TTL_SECS);
}
let rest = trimmed.strip_prefix("/task").unwrap_or(trimmed).trim();
let mut max_turns = TASK_DEFAULT_MAX_TURNS;
let mut ttl_secs = TASK_DEFAULT_TTL_SECS;
let mut msg_parts: Vec<&str> = Vec::new();
let mut iter = rest.split_whitespace().peekable();
while let Some(tok) = iter.next() {
match tok {
"--turns" | "-n" => {
if let Some(val) = iter.peek().and_then(|v| v.parse::<u32>().ok()) {
max_turns = val;
iter.next();
continue;
}
msg_parts.push(tok);
}
"--timeout" | "-t" => {
if let Some(val) = iter.peek().and_then(|v| parse_duration_str(v)) {
ttl_secs = val;
iter.next();
continue;
}
msg_parts.push(tok);
}
_ => msg_parts.push(tok),
}
}
*text = msg_parts.join(" ");
(max_turns, ttl_secs)
}
fn parse_duration_str(s: &str) -> Option<u64> {
let mut total: u64 = 0;
let mut num_buf = String::new();
for c in s.chars() {
if c.is_ascii_digit() {
num_buf.push(c);
} else {
let n: u64 = num_buf.parse().ok()?;
num_buf.clear();
match c {
'h' | 'H' => total += n * 3600,
'm' | 'M' => total += n * 60,
's' | 'S' => total += n,
_ => return None,
}
}
}
if !num_buf.is_empty() {
total += num_buf.parse::<u64>().ok()?;
}
if total > 0 { Some(total) } else { None }
}
fn compute_hash(text: &str) -> String {
let hash = Md5::digest(text.as_bytes());
hex::encode(hash)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStats {
pub pending: usize,
pub running: usize,
pub done: usize,
pub failed: usize,
pub dead: usize,
}
pub struct TaskQueueManager {
store: Arc<RedbStore>,
notify: Notify,
}
type ChannelSendersMap = Arc<RwLock<HashMap<String, mpsc::Sender<OutboundMessage>>>>;
static CHANNEL_SENDERS: OnceLock<ChannelSendersMap> = OnceLock::new();
static TASK_QUEUE: OnceLock<Arc<TaskQueueManager>> = OnceLock::new();
pub fn install_channel_senders(senders: ChannelSendersMap) {
if CHANNEL_SENDERS.set(senders).is_err() {
warn!("task_queue: channel senders already installed, ignoring duplicate install");
}
}
pub fn install_task_queue(manager: Arc<TaskQueueManager>) {
if TASK_QUEUE.set(manager).is_err() {
warn!("task_queue: manager already installed, ignoring duplicate install");
}
}
pub fn get_task_queue() -> Option<Arc<TaskQueueManager>> {
TASK_QUEUE.get().cloned()
}
fn lookup_channel_sender_for(
name: &str,
account: Option<&str>,
) -> Option<mpsc::Sender<OutboundMessage>> {
let map = CHANNEL_SENDERS.get()?.read().ok()?;
if let Some(acct) = account.filter(|s| !s.is_empty()) {
let key = format!("{name}/{acct}");
if let Some(tx) = map.get(&key).cloned() {
return Some(tx);
}
}
map.get(name).cloned()
}
fn task_ack_text(task_id: &str, max_turns: u32, ttl_secs: u64, lang: &str) -> String {
let ttl_human = if ttl_secs >= 3600 && ttl_secs % 3600 == 0 {
format!("{}h", ttl_secs / 3600)
} else if ttl_secs >= 60 && ttl_secs % 60 == 0 {
format!("{}m", ttl_secs / 60)
} else {
format!("{ttl_secs}s")
};
if lang == "zh" {
format!(
"任务已收到,开始处理(最多 {max_turns} 轮,超时 {ttl_human})\nID: {task_id}\n中止: /abort"
)
} else {
format!(
"Task received, working on it (up to {max_turns} turns, timeout {ttl_human})\nID: {task_id}\nAbort: /abort"
)
}
}
fn send_task_ack(task: &QueuedTask, max_turns: u32, ttl_secs: u64) {
let Some(msg) = task.messages.first() else { return };
let Some(tx) = lookup_channel_sender_for(&msg.channel, msg.account.as_deref()) else {
warn!(channel = %msg.channel, task_id = %task.id, "task_queue: channel sender not registered, ack dropped");
return;
};
let lang = crate::i18n::default_lang();
let ack = OutboundMessage {
target_id: msg.chat_id.clone(),
is_group: msg.is_group,
text: task_ack_text(&task.id, max_turns, ttl_secs, lang),
reply_to: msg.reply_to.clone(),
images: vec![],
files: vec![],
channel: Some(msg.channel.clone()),
account: msg.account.clone(),
};
if let Err(e) = tx.try_send(ack) {
warn!(channel = %msg.channel, task_id = %task.id, error = %e, "task_queue: ack send failed");
}
}
impl TaskQueueManager {
pub fn new(store: Arc<RedbStore>) -> Self {
Self {
store,
notify: Notify::new(),
}
}
pub async fn notified(&self) {
self.notify.notified().await;
}
pub fn submit(
&self,
session_key: &str,
mut message: QueuedMessage,
priority: Priority,
) -> Result<(String, bool)> {
let (max_turns, ttl_secs) = parse_task_prefix(&mut message.text);
let hash = compute_hash(&message.text);
if self.store.has_duplicate(session_key, &hash)? {
tracing::info!(session_key, "task_queue: duplicate message dropped");
return Ok(("dedup".to_string(), false));
}
if self.store.merge_into_pending(session_key, &message)? {
tracing::info!(session_key, "task_queue: message merged into pending task");
self.notify.notify_one();
return Ok(("merged".to_string(), true));
}
let mut task = QueuedTask::new(session_key.to_string(), message, priority);
task.max_turns = max_turns;
task.ttl_secs = ttl_secs;
let id = task.id.clone();
self.store.enqueue_task(&task)?;
if max_turns > 0 {
tracing::info!(session_key, task_id = %id, max_turns, ttl_secs, "task_queue: task enqueued (task mode)");
send_task_ack(&task, max_turns, ttl_secs);
} else {
tracing::info!(session_key, task_id = %id, "task_queue: message enqueued");
}
self.notify.notify_one();
Ok((id, false))
}
pub fn submit_task(
&self,
session_key: &str,
message: QueuedMessage,
priority: Priority,
max_turns: u32,
ttl_secs: u64,
) -> Result<(String, bool)> {
let hash = compute_hash(&message.text);
if self.store.has_duplicate(session_key, &hash)? {
tracing::info!(session_key, "task_queue: duplicate task dropped");
return Ok(("dedup".to_string(), false));
}
if self.store.merge_into_pending(session_key, &message)? {
tracing::info!(session_key, "task_queue: message merged into pending task");
self.notify.notify_one();
return Ok(("merged".to_string(), true));
}
let mut task = QueuedTask::new(session_key.to_string(), message, priority);
task.max_turns = max_turns;
task.ttl_secs = ttl_secs;
let id = task.id.clone();
self.store.enqueue_task(&task)?;
tracing::info!(session_key, task_id = %id, max_turns, ttl_secs, "task_queue: task enqueued");
send_task_ack(&task, max_turns, ttl_secs);
self.notify.notify_one();
Ok((id, false))
}
pub fn next(&self) -> Result<Option<QueuedTask>> {
let cleaned = self.store.cleanup_expired_tasks()?;
if cleaned > 0 {
tracing::info!(count = cleaned, "task_queue: cleaned expired tasks");
}
self.store.dequeue_task()
}
pub fn complete(&self, task_id: &str) -> Result<()> {
self.store.update_task_status(task_id, TaskStatus::Done)
}
pub fn recover_orphan_tasks(&self) -> Result<usize> {
self.store.requeue_running_tasks()
}
pub fn mark_notified(&self, task_id: &str) -> Result<()> {
self.store.mark_task_notified(task_id)
}
pub fn record_last_reply(&self, task_id: &str, text: &str) -> Result<()> {
self.store.update_task_last_reply(task_id, text)
}
pub fn record_turn(&self, task_id: &str, turn: u32) -> Result<()> {
self.store.update_task_turn(task_id, turn)
}
pub fn is_idem_delivered(&self, key: &str) -> Result<bool> {
self.store.is_idem_delivered(key)
}
pub fn mark_idem_delivered(&self, key: &str) -> Result<()> {
self.store.mark_idem_delivered(key)
}
pub fn cleanup_idem_keys(&self, retention_secs: i64) -> Result<usize> {
self.store.cleanup_idem_keys(retention_secs)
}
pub fn list_pending_notifications(
&self,
session_key: &str,
) -> Result<Vec<QueuedTask>> {
let mut all = self.store.list_tasks(Some(TaskStatus::Done))?;
all.retain(|t| t.session_key == session_key && !t.notified);
Ok(all)
}
pub fn fail(&self, task_id: &str, _error: &str, max_retries: u32) -> Result<TaskStatus> {
self.store.fail_task(task_id, max_retries)
}
pub fn stats(&self) -> Result<QueueStats> {
let all = self.store.list_tasks(None)?;
Ok(QueueStats {
pending: all.iter().filter(|t| t.status == TaskStatus::Pending).count(),
running: all.iter().filter(|t| t.status == TaskStatus::Running).count(),
done: all.iter().filter(|t| t.status == TaskStatus::Done).count(),
failed: all.iter().filter(|t| t.status == TaskStatus::Failed).count(),
dead: all.iter().filter(|t| t.status == TaskStatus::Dead).count(),
})
}
}
fn staging_dir() -> std::path::PathBuf {
crate::config::loader::base_dir()
.join("var/data/queue/staging")
}
pub fn stage_file(filename: &str, data: &[u8], mime_type: &str) -> Result<QueuedFile> {
let dir = staging_dir();
std::fs::create_dir_all(&dir)?;
let safe_name = filename.replace(['/', '\\'], "_");
let staged = format!("{}_{}", uuid::Uuid::new_v4(), safe_name);
let path = dir.join(&staged);
std::fs::write(&path, data)?;
Ok(QueuedFile {
filename: filename.to_string(),
path: path.to_string_lossy().to_string(),
mime_type: mime_type.to_string(),
})
}
fn unstage_file(qf: &QueuedFile) -> FileAttachment {
let data = std::fs::read(&qf.path).unwrap_or_default();
FileAttachment {
filename: qf.filename.clone(),
data,
mime_type: qf.mime_type.clone(),
}
}
fn cleanup_staged_files(task: &QueuedTask) {
for msg in &task.messages {
for qf in &msg.files {
if let Err(e) = std::fs::remove_file(&qf.path) {
tracing::debug!(path = %qf.path, "staging cleanup: {e}");
}
}
}
}
pub fn submit_to_queue(
manager: &TaskQueueManager,
session_key: &str,
text: &str,
channel: &str,
peer_id: &str,
chat_id: &str,
is_group: bool,
priority: Priority,
) -> Result<(String, bool)> {
let message = QueuedMessage {
text: text.to_string(),
sender: peer_id.to_string(),
channel: channel.to_string(),
chat_id: chat_id.to_string(),
is_group,
reply_to: None,
timestamp: chrono::Utc::now().timestamp(),
images: vec![],
files: vec![],
account: None,
};
manager.submit(session_key, message, priority)
}
fn classify_outcome(reply: &crate::agent::AgentReply) -> TaskOutcome {
let text = reply.text.trim();
if text.is_empty() && reply.images.is_empty() && reply.files.is_empty() {
return TaskOutcome::Stuck("empty reply".to_string());
}
let lower = text.to_lowercase();
for pat in [
"rate limit",
"rate_limit",
"quota exceeded",
"context length exceeded",
"context_length_exceeded",
"maximum context",
"too many tokens",
] {
if lower.contains(pat) {
return TaskOutcome::Error(pat.to_string());
}
}
for pat in [
"i can't",
"i cannot",
"i'm unable",
"i am unable",
"i don't know how",
"i'm not sure how",
"i need more information",
"please provide",
"could you clarify",
"i'm stuck",
] {
if lower.contains(pat) {
return TaskOutcome::Stuck(pat.to_string());
}
}
for pat in [
"i'll continue",
"i will continue",
"next step",
"let me continue",
"continuing",
"in progress",
"working on",
"todo",
"to be continued",
"not yet complete",
"partially done",
] {
if lower.contains(pat) {
return TaskOutcome::Partial;
}
}
TaskOutcome::Done
}
fn continuation_prompt(outcome: &TaskOutcome, turn: u32) -> String {
match outcome {
TaskOutcome::Partial => {
format!("[auto-continue turn {turn}] Continue from where you left off. Complete the remaining work.")
}
TaskOutcome::Stuck(reason) => {
format!(
"[auto-continue turn {turn}] Previous attempt got stuck ({reason}). \
Try a different approach. If truly impossible, explain why \
concisely and stop."
)
}
TaskOutcome::Error(err) => {
format!(
"[auto-continue turn {turn}] Previous attempt encountered an error: {err}. \
Retry or work around it."
)
}
TaskOutcome::Done => String::new(),
}
}
pub struct TaskQueueWorker {
manager: Arc<TaskQueueManager>,
registry: Arc<AgentRegistry>,
channel_senders: Arc<std::sync::RwLock<HashMap<String, mpsc::Sender<OutboundMessage>>>>,
shutdown: super::shutdown::ShutdownCoordinator,
config: crate::config::runtime::RuntimeConfig,
}
impl TaskQueueWorker {
pub fn new(
manager: Arc<TaskQueueManager>,
registry: Arc<AgentRegistry>,
channel_senders: Arc<std::sync::RwLock<HashMap<String, mpsc::Sender<OutboundMessage>>>>,
shutdown: super::shutdown::ShutdownCoordinator,
config: crate::config::runtime::RuntimeConfig,
) -> Self {
Self {
manager,
registry,
channel_senders,
shutdown,
config,
}
}
fn channel_tx_for(
&self,
name: &str,
account: Option<&str>,
) -> Option<mpsc::Sender<OutboundMessage>> {
if let Some(acct) = account.filter(|s| !s.is_empty()) {
let key = format!("{name}/{acct}");
if let Some(tx) = self
.channel_senders
.read()
.ok()
.and_then(|map| map.get(&key).cloned())
{
return Some(tx);
}
}
self.channel_tx(name)
}
fn channel_tx(&self, name: &str) -> Option<mpsc::Sender<OutboundMessage>> {
self.channel_senders
.read()
.expect("channel_senders lock poisoned")
.get(name)
.cloned()
}
async fn notify_user_failure(
&self,
channel_name: &str,
account: Option<&str>,
target: &str,
is_group: bool,
reply_to: Option<String>,
turn: u32,
reason: &str,
) {
let Some(tx) = self.channel_tx_for(channel_name, account) else {
warn!(channel = %channel_name, "no channel sender registered, failure notice dropped");
return;
};
let text = crate::i18n::t_fmt(
"task_notify_failure",
crate::i18n::default_lang(),
&[("reason", reason)],
);
let out = OutboundMessage {
target_id: target.to_owned(),
is_group,
text,
reply_to: if turn == 1 { reply_to } else { None },
images: vec![],
files: vec![],
channel: Some(channel_name.to_owned()),
account: account.map(str::to_owned),
};
if let Err(e) = tx.send(out).await {
error!(channel = %channel_name, "failure notice send failed: {e}");
}
}
pub async fn run(self: Arc<Self>) {
info!("task queue worker started");
match self.manager.recover_orphan_tasks() {
Ok(0) => {}
Ok(n) => info!(count = n, "task queue worker: revived orphan Running tasks → Pending"),
Err(e) => error!("task queue worker: orphan recovery failed: {e:#}"),
}
let mut idem_gc_counter: u32 = 0;
loop {
if self.shutdown.is_draining() {
info!("task queue worker: drain signaled, stopping dequeue");
break;
}
match self.manager.next() {
Ok(Some(task)) => {
let guard = self.shutdown.begin_work();
let worker = Arc::clone(&self);
tokio::spawn(async move {
worker.process_task(task).await;
drop(guard);
});
continue;
}
Ok(None) => {
tokio::select! {
() = self.manager.notified() => {}
() = tokio::time::sleep(Duration::from_secs(5)) => {}
}
}
Err(e) => {
error!("task queue worker: dequeue error: {e:#}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
idem_gc_counter = idem_gc_counter.wrapping_add(1);
if idem_gc_counter % 720 == 0 {
match self.manager.cleanup_idem_keys(24 * 3600) {
Ok(0) => {}
Ok(n) => info!(count = n, "task queue worker: cleaned old idem keys"),
Err(e) => warn!("task queue worker: idem cleanup failed: {e:#}"),
}
}
}
info!("task queue worker exited");
}
async fn process_task(&self, task: QueuedTask) {
let task_id = task.id.clone();
let session_key = task.session_key.clone();
let max_turns = task.max_turns;
let Some(first_msg) = task.messages.first() else {
error!(task_id = %task_id, "task queue worker: task has no messages, skipping");
return;
};
let channel_name = first_msg.channel.clone();
let account = first_msg.account.clone();
let peer_id = first_msg.sender.clone();
let chat_id = first_msg.chat_id.clone();
let is_group = first_msg.is_group;
let reply_to = first_msg.reply_to.clone();
info!(
task_id = %task_id,
session_key = %session_key,
channel = %channel_name,
messages = task.messages.len(),
max_turns,
"task queue worker: processing task"
);
let handle = match self.registry.route(&channel_name) {
Ok(h) => h,
Err(_) => match self.registry.default_agent() {
Ok(h) => h,
Err(e) => {
error!(task_id = %task_id, "task queue worker: no agent for channel {channel_name}: {e:#}");
if let Err(fe) = self.manager.fail(&task_id, &format!("{e:#}"), task.max_retries) {
error!(task_id = %task_id, "task queue worker: fail() error: {fe:#}");
}
return;
}
},
};
let first_text = task.merged_text();
let first_images: Vec<ImageAttachment> = task
.messages
.iter()
.flat_map(|m| {
m.images.iter().map(|data| ImageAttachment {
data: data.clone(),
mime_type: "image/png".to_string(),
})
})
.collect();
let first_files: Vec<FileAttachment> = task
.messages
.iter()
.flat_map(|m| m.files.iter().map(unstage_file))
.collect();
let target = if chat_id.is_empty() { peer_id.clone() } else { chat_id.clone() };
let mut turn: u32 = task.turns;
if turn > 0 {
info!(task_id = %task_id, resume_turn = turn, "task queue worker: resuming /task after recovery");
}
let mut next_text = first_text;
let mut next_images = first_images;
let mut next_files = first_files;
let mut last_send_ok = false;
loop {
turn += 1;
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: session_key.clone(),
text: next_text,
channel: channel_name.clone(),
peer_id: peer_id.clone(),
chat_id: chat_id.clone(),
reply_tx,
extra_tools: vec![],
images: next_images,
files: next_files,
account: None,
};
info!(task_id = %task_id, turn, "task queue worker: agent turn");
if handle.tx.send(msg).await.is_err() {
error!(task_id = %task_id, "task queue worker: agent channel closed");
if let Err(fe) = self.manager.fail(&task_id, "agent channel closed", task.max_retries) {
error!(task_id = %task_id, "task queue worker: fail() error: {fe:#}");
}
break;
}
let reply = match tokio::time::timeout(Duration::from_secs(2700), reply_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => {
error!(task_id = %task_id, turn, "task queue worker: reply channel dropped");
self.notify_user_failure(&channel_name, account.as_deref(), &target, is_group, reply_to.clone(), turn, "reply channel dropped").await;
match self.manager.fail(&task_id, "reply channel dropped", task.max_retries) {
Ok(TaskStatus::Dead) => cleanup_staged_files(&task),
Err(fe) => error!(task_id = %task_id, "fail() error: {fe:#}"),
_ => {}
}
break;
}
Err(_) => {
error!(task_id = %task_id, turn, "task queue worker: reply timeout (2700s)");
self.notify_user_failure(&channel_name, account.as_deref(), &target, is_group, reply_to.clone(), turn, "reply timeout (45m)").await;
match self.manager.fail(&task_id, "reply timeout", task.max_retries) {
Ok(TaskStatus::Dead) => cleanup_staged_files(&task),
Err(fe) => error!(task_id = %task_id, "fail() error: {fe:#}"),
_ => {}
}
break;
}
};
let outcome = classify_outcome(&reply);
let pending = reply.pending_analysis;
let had_reply_payload = !reply.text.is_empty()
|| !reply.images.is_empty()
|| !reply.files.is_empty();
if !reply.text.is_empty() {
if let Err(e) = self.manager.record_last_reply(&task_id, &reply.text) {
tracing::warn!(task_id = %task_id, "record_last_reply failed: {e:#}");
}
}
if had_reply_payload {
let idem_key = format!("task:{task_id}:turn:{turn}");
let already_delivered = match self.manager.is_idem_delivered(&idem_key) {
Ok(v) => v,
Err(e) => {
warn!(task_id = %task_id, "is_idem_delivered failed: {e:#}");
false
}
};
if already_delivered {
info!(
task_id = %task_id, turn,
"task queue worker: turn reply already delivered, skipping channel send"
);
last_send_ok = true;
} else {
let out = OutboundMessage {
target_id: target.clone(),
is_group,
text: reply.text.clone(),
reply_to: if turn == 1 { reply_to.clone() } else { None },
images: reply.images.clone(),
files: reply.files.clone(),
channel: Some(channel_name.clone()),
account: account.clone(),
};
if let Some(tx) = self.channel_tx_for(&channel_name, account.as_deref()) {
match tx.send(out).await {
Ok(_) => {
last_send_ok = true;
if let Err(e) = self.manager.mark_idem_delivered(&idem_key) {
warn!(task_id = %task_id, "mark_idem_delivered failed: {e:#}");
}
}
Err(e) => {
last_send_ok = false;
error!(task_id = %task_id, "send reply failed: {e}");
}
}
} else {
last_send_ok = false;
tracing::warn!(
task_id = %task_id,
channel = %channel_name,
"no channel sender registered, reply dropped"
);
}
}
}
if let Some(analysis) = pending {
if let Some(tx) = self.channel_tx_for(&channel_name, account.as_deref()) {
crate::gateway::startup::handle_pending_analysis(
analysis,
Arc::clone(&handle),
&tx,
target.clone(),
is_group,
&self.config,
)
.await;
}
}
info!(task_id = %task_id, turn, outcome = ?outcome, "task queue worker: turn outcome");
if let Err(e) = self.manager.record_turn(&task_id, turn) {
tracing::warn!(task_id = %task_id, "record_turn failed: {e:#}");
}
match outcome {
TaskOutcome::Done => {
info!(task_id = %task_id, turn, "task queue worker: task completed");
if let Err(e) = self.manager.complete(&task_id) {
error!(task_id = %task_id, "complete() error: {e:#}");
}
if last_send_ok {
if let Err(e) = self.manager.mark_notified(&task_id) {
error!(task_id = %task_id, "mark_notified() error: {e:#}");
}
}
cleanup_staged_files(&task);
break;
}
TaskOutcome::Partial | TaskOutcome::Stuck(_) | TaskOutcome::Error(_) => {
if max_turns == 0 || turn >= max_turns {
info!(
task_id = %task_id, turn, max_turns,
"task queue worker: max turns reached, marking done"
);
if let Err(e) = self.manager.complete(&task_id) {
error!(task_id = %task_id, "complete() error: {e:#}");
}
if last_send_ok {
if let Err(e) = self.manager.mark_notified(&task_id) {
error!(task_id = %task_id, "mark_notified() error: {e:#}");
}
}
cleanup_staged_files(&task);
break;
}
let prompt = continuation_prompt(&outcome, turn);
info!(task_id = %task_id, turn, "task queue worker: auto-continue");
next_text = prompt;
next_images = vec![];
next_files = vec![];
if matches!(outcome, TaskOutcome::Error(_)) {
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_task_prefix_short_flags() {
let mut text = "/task -n 20 fix the login bug".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 20);
assert_eq!(ttl, TASK_DEFAULT_TTL_SECS);
assert_eq!(text, "fix the login bug");
}
#[test]
fn parse_task_prefix_short_flags_combined() {
let mut text = "/task -n 50 -t 4h refactor payments".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 50);
assert_eq!(ttl, 4 * 3600);
assert_eq!(text, "refactor payments");
}
#[test]
fn parse_task_prefix_long_flags_still_work() {
let mut text = "/task --turns 30 --timeout 2h work".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 30);
assert_eq!(ttl, 2 * 3600);
assert_eq!(text, "work");
}
#[test]
fn parse_task_prefix_em_dash_normalized() {
let mut text = "/task \u{2014}turns 25 \u{2014}timeout 30m do x".to_string();
let (turns, ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 25);
assert_eq!(ttl, 30 * 60);
assert_eq!(text, "do x");
}
#[test]
fn parse_task_prefix_no_task_prefix_chat_mode() {
let mut text = "hello there".to_string();
let (turns, _ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, 0);
}
#[test]
fn parse_task_prefix_n_without_value_kept_as_text() {
let mut text = "/task -n investigate logs".to_string();
let (turns, _ttl) = parse_task_prefix(&mut text);
assert_eq!(turns, TASK_DEFAULT_MAX_TURNS);
assert_eq!(text, "-n investigate logs");
}
}