use std::{sync::Arc, time::Duration};
use anyhow::{Result, anyhow};
use serde_json::json;
use tokio::sync::{Semaphore, broadcast};
use tracing::{debug, error, info, warn};
use super::{
external_jobs::{ExternalJob, ExternalJobKind, ExternalJobStatus, PollOutcome},
shutdown::ShutdownCoordinator,
};
use rsclaw_channel::OutboundMessage;
use rsclaw_config::runtime::RuntimeConfig;
use rsclaw_store::RedbStore;
const TICK_SECS: u64 = 5;
const FINISHED_RETENTION_SECS: i64 = 24 * 3600;
const MAX_CONCURRENT_OPS: usize = 8;
const DELIVERY_RETRY_DELAY_SECS: u64 = 30;
pub struct ExternalJobsWorker {
store: Arc<RedbStore>,
notification_tx: broadcast::Sender<OutboundMessage>,
shutdown: ShutdownCoordinator,
config: Arc<RuntimeConfig>,
client: reqwest::Client,
op_semaphore: Arc<Semaphore>,
}
impl ExternalJobsWorker {
pub fn new(
store: Arc<RedbStore>,
notification_tx: broadcast::Sender<OutboundMessage>,
shutdown: ShutdownCoordinator,
config: Arc<RuntimeConfig>,
) -> Self {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default();
Self {
store,
notification_tx,
shutdown,
config,
client,
op_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_OPS)),
}
}
pub async fn run(self: Arc<Self>) {
info!("external jobs worker started");
let mut gc_counter: u32 = 0;
loop {
if self.shutdown.is_draining() {
info!("external jobs worker: drain signaled, stopping");
break;
}
let now = chrono::Utc::now().timestamp();
match self.store.due_external_jobs(now) {
Ok(jobs) if !jobs.is_empty() => {
debug!(count = jobs.len(), "external jobs: due tick");
for job in jobs {
let worker = Arc::clone(&self);
let guard = self.shutdown.begin_work();
let sem = Arc::clone(&self.op_semaphore);
tokio::spawn(async move {
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => {
drop(guard);
return;
}
};
worker.process_job(job).await;
drop(guard);
});
}
}
Ok(_) => {}
Err(e) => error!("external jobs: due query failed: {e:#}"),
}
gc_counter = gc_counter.wrapping_add(1);
if gc_counter % 12 == 0 {
if let Err(e) = self
.store
.cleanup_finished_external_jobs(FINISHED_RETENTION_SECS)
{
warn!("external jobs: cleanup_finished failed: {e:#}");
}
}
tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
}
info!("external jobs worker exited");
}
async fn process_job(&self, job: ExternalJob) {
if job.needs_delivery() {
self.retry_delivery(job).await;
} else if matches!(
job.status,
ExternalJobStatus::Pending | ExternalJobStatus::Polling
) {
self.poll_cycle(job).await;
}
}
async fn poll_cycle(&self, mut job: ExternalJob) {
if job.is_timed_out() {
job.status = ExternalJobStatus::TimedOut;
job.error = Some(format!(
"timed out after {}s",
chrono::Utc::now().timestamp() - job.submitted_at
));
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update failed: {e:#}");
}
self.attempt_delivery(&mut job).await;
return;
}
job.status = ExternalJobStatus::Polling;
job.poll_count += 1;
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (polling) failed: {e:#}");
return;
}
let outcome = self.dispatch_poll(&job).await;
match outcome {
Ok(PollOutcome::Pending) => {
let now = chrono::Utc::now().timestamp();
job.next_poll_at = now + job.next_poll_delay_secs() as i64;
job.status = ExternalJobStatus::Pending;
job.error = None;
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (pending) failed: {e:#}");
}
}
Ok(PollOutcome::Done(url)) => {
job.result_url = Some(url.clone());
let dl = if job.provider == "rsclaw" {
let key = self
.resolve_provider_key("rsclaw", "RSCLAW_API_KEY")
.unwrap_or_default();
rsclaw_jobs::download_artifact_authed(&url, &key, job.kind).await
} else {
rsclaw_jobs::download_artifact(&self.client, &url, job.kind).await
};
match dl {
Ok(local_path) => {
job.result_path = Some(local_path);
job.status = ExternalJobStatus::Done;
job.error = None;
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (done) failed: {e:#}");
}
self.attempt_delivery(&mut job).await;
}
Err(e) => {
job.status = ExternalJobStatus::Failed;
job.error = Some(format!("download: {e:#}"));
if let Err(e2) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (download-fail) failed: {e2:#}");
}
self.attempt_delivery(&mut job).await;
}
}
}
Ok(PollOutcome::Failed(msg)) => {
job.status = ExternalJobStatus::Failed;
job.error = Some(msg);
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (failed) failed: {e:#}");
}
self.attempt_delivery(&mut job).await;
}
Err(e) => {
let now = chrono::Utc::now().timestamp();
job.next_poll_at = now + job.next_poll_delay_secs() as i64;
job.status = ExternalJobStatus::Pending;
job.error = Some(format!("poll: {e:#}"));
warn!(job_id = %job.id, error = %e, "external jobs: transient poll error");
if let Err(e2) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (transient) failed: {e2:#}");
}
}
}
}
async fn retry_delivery(&self, mut job: ExternalJob) {
info!(
job_id = %job.id,
attempts = job.delivery_attempts,
"external jobs: retrying delivery"
);
self.attempt_delivery(&mut job).await;
}
async fn attempt_delivery(&self, job: &mut ExternalJob) {
job.delivery_attempts = job.delivery_attempts.saturating_add(1);
let success = if matches!(job.status, ExternalJobStatus::Done) {
self.deliver_success(job).await
} else {
self.deliver_failure(job).await
};
if success {
job.delivered_at = Some(chrono::Utc::now().timestamp());
if let Err(e) = self.store.update_external_job(job) {
error!(job_id = %job.id, "update (delivered) failed: {e:#}");
}
if let Err(e) = self.write_back_to_session(job) {
debug!(job_id = %job.id, "session writeback skipped: {e:#}");
}
} else {
let now = chrono::Utc::now().timestamp();
job.next_poll_at = now + DELIVERY_RETRY_DELAY_SECS as i64;
warn!(
job_id = %job.id,
attempts = job.delivery_attempts,
"external jobs: delivery failed, will retry"
);
if let Err(e) = self.store.update_external_job(job) {
error!(job_id = %job.id, "update (delivery-retry) failed: {e:#}");
}
}
}
fn write_back_to_session(&self, job: &ExternalJob) -> Result<()> {
let kind_label = match job.kind {
ExternalJobKind::VideoGen => "video",
ExternalJobKind::ImageGen => "image",
};
let path = job.result_path.as_deref().unwrap_or("");
let content = if matches!(job.status, ExternalJobStatus::Done) {
format!("[{kind_label} generation complete] {path}")
} else {
format!(
"[{kind_label} generation {}] {}",
match job.status {
ExternalJobStatus::Failed => "failed",
ExternalJobStatus::TimedOut => "timed out",
_ => "ended",
},
job.error.as_deref().unwrap_or("")
)
};
let msg = json!({
"role": "assistant",
"content": content,
"external_job_id": job.id,
});
self.store
.append_message(&job.session_key, &msg)
.map(|_| ())
.map_err(|e| anyhow!("append_message: {e}"))
}
async fn dispatch_poll(&self, job: &ExternalJob) -> Result<PollOutcome> {
match job.provider.as_str() {
"seedance" => {
let key = self
.resolve_provider_key("doubao", "ARK_API_KEY")
.ok_or_else(|| anyhow!("seedance: no API key configured"))?;
rsclaw_jobs::poll_seedance(&self.client, &key, &job.external_task_id).await
}
"minimax" => {
let key = self
.resolve_provider_key("minimax", "MINIMAX_API_KEY")
.ok_or_else(|| anyhow!("minimax: no API key configured"))?;
rsclaw_jobs::poll_minimax(&self.client, &key, &job.external_task_id).await
}
"kling" => {
let (ak, sk) = self.resolve_kling_keys().ok_or_else(|| {
anyhow!("kling: KLING_ACCESS_KEY / KLING_SECRET_KEY not configured")
})?;
rsclaw_jobs::poll_kling(&self.client, &ak, &sk, &job.external_task_id).await
}
"rsclaw" => {
let key = self
.resolve_provider_key("rsclaw", "RSCLAW_API_KEY")
.ok_or_else(|| anyhow!("rsclaw: no API key configured"))?;
rsclaw_jobs::poll_rsclaw(&key, &job.external_task_id).await
}
"agnes" => {
let key = self
.resolve_provider_key("agnes", "AGNES_API_KEY")
.ok_or_else(|| anyhow!("agnes: no API key configured"))?;
rsclaw_jobs::poll_agnes(&self.client, &key, &job.external_task_id).await
}
"openai" => {
let key = self
.resolve_provider_key("openai", "OPENAI_API_KEY")
.ok_or_else(|| anyhow!("openai: no API key configured"))?;
let base = self.resolve_provider_base_url("openai");
rsclaw_jobs::poll_openai_video(&self.client, &base, &key, &job.external_task_id)
.await
}
"rsclaw_image" => {
rsclaw_jobs::poll_rsclaw_image(&self.client, &job.external_task_id).await
}
other => Err(anyhow!("no async polling adapter for provider: {other}")),
}
}
fn resolve_provider_base_url(&self, provider: &str) -> String {
self.config
.model
.models
.as_ref()
.and_then(|m| m.providers.get(provider))
.and_then(|p| p.base_url.clone())
.unwrap_or_else(|| rsclaw_provider::defaults::resolve_base_url(provider).0)
}
fn resolve_provider_key(&self, provider: &str, env_var: &str) -> Option<String> {
self.config
.model
.models
.as_ref()
.and_then(|m| m.providers.get(provider))
.and_then(|p| p.api_key.as_ref())
.and_then(|k| k.as_plain().map(str::to_owned))
.or_else(|| std::env::var(env_var).ok())
}
fn resolve_kling_keys(&self) -> Option<(String, String)> {
let ak = self.resolve_provider_key("kling", "KLING_ACCESS_KEY")?;
let sk = std::env::var("KLING_SECRET_KEY").ok()?;
Some((ak, sk))
}
async fn deliver_success(&self, job: &ExternalJob) -> bool {
let path = job.result_path.as_deref().unwrap_or("");
let kind_label = match job.kind {
ExternalJobKind::VideoGen => "video",
ExternalJobKind::ImageGen => "image",
};
let filename = std::path::Path::new(path)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or(path);
let mime = match job.kind {
ExternalJobKind::VideoGen => "video/mp4",
ExternalJobKind::ImageGen => "image/png",
};
let prompt_preview: String = job.prompt.chars().take(80).collect();
let out = OutboundMessage {
target_id: job.delivery.target_id.clone(),
is_group: job.delivery.is_group,
text: format!("[{kind_label}] {prompt_preview}"),
reply_to: job.delivery.reply_to.clone(),
images: vec![],
files: vec![(filename.to_string(), mime.to_string(), path.to_string())],
channel: Some(job.delivery.channel.clone()),
account: job.delivery.account.clone(),
};
match self.notification_tx.send(out) {
Ok(_) => true,
Err(e) => {
warn!(job_id = %job.id, "deliver_success: notification_tx failed: {e}");
false
}
}
}
async fn deliver_failure(&self, job: &ExternalJob) -> bool {
let kind_label = match job.kind {
ExternalJobKind::VideoGen => "video",
ExternalJobKind::ImageGen => "image",
};
let reason = job.error.as_deref().unwrap_or("unknown error");
let prompt_preview: String = job.prompt.chars().take(80).collect();
let text = format!("[{kind_label} failed] {prompt_preview}: {reason}");
let out = OutboundMessage {
target_id: job.delivery.target_id.clone(),
is_group: job.delivery.is_group,
text,
reply_to: job.delivery.reply_to.clone(),
images: vec![],
files: vec![],
channel: Some(job.delivery.channel.clone()),
account: job.delivery.account.clone(),
};
match self.notification_tx.send(out) {
Ok(_) => true,
Err(e) => {
warn!(job_id = %job.id, "deliver_failure: notification_tx failed: {e}");
false
}
}
}
}