use std::sync::Arc;
use std::time::Duration;
use anyhow::{Result, anyhow};
use serde_json::json;
use tokio::sync::{Semaphore, broadcast};
use tracing::{debug, error, info, warn};
use super::external_jobs::{
ExternalJob, ExternalJobKind, ExternalJobStatus, PollOutcome,
};
use super::shutdown::ShutdownCoordinator;
use crate::channel::OutboundMessage;
use crate::config::runtime::RuntimeConfig;
use crate::store::RedbStore;
const TICK_SECS: u64 = 5;
const FINISHED_RETENTION_SECS: i64 = 24 * 3600;
const MAX_CONCURRENT_OPS: usize = 8;
const DELIVERY_RETRY_DELAY_SECS: u64 = 30;
pub struct ExternalJobsWorker {
store: Arc<RedbStore>,
notification_tx: broadcast::Sender<OutboundMessage>,
shutdown: ShutdownCoordinator,
config: Arc<RuntimeConfig>,
client: reqwest::Client,
op_semaphore: Arc<Semaphore>,
}
impl ExternalJobsWorker {
pub fn new(
store: Arc<RedbStore>,
notification_tx: broadcast::Sender<OutboundMessage>,
shutdown: ShutdownCoordinator,
config: Arc<RuntimeConfig>,
) -> Self {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default();
Self {
store,
notification_tx,
shutdown,
config,
client,
op_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_OPS)),
}
}
pub async fn run(self: Arc<Self>) {
info!("external jobs worker started");
let mut gc_counter: u32 = 0;
loop {
if self.shutdown.is_draining() {
info!("external jobs worker: drain signaled, stopping");
break;
}
let now = chrono::Utc::now().timestamp();
match self.store.due_external_jobs(now) {
Ok(jobs) if !jobs.is_empty() => {
debug!(count = jobs.len(), "external jobs: due tick");
for job in jobs {
let worker = Arc::clone(&self);
let guard = self.shutdown.begin_work();
let sem = Arc::clone(&self.op_semaphore);
tokio::spawn(async move {
let _permit = match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => {
drop(guard);
return;
}
};
worker.process_job(job).await;
drop(guard);
});
}
}
Ok(_) => {}
Err(e) => error!("external jobs: due query failed: {e:#}"),
}
gc_counter = gc_counter.wrapping_add(1);
if gc_counter % 12 == 0 {
if let Err(e) = self.store.cleanup_finished_external_jobs(FINISHED_RETENTION_SECS) {
warn!("external jobs: cleanup_finished failed: {e:#}");
}
}
tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
}
info!("external jobs worker exited");
}
async fn process_job(&self, job: ExternalJob) {
if job.needs_delivery() {
self.retry_delivery(job).await;
} else if matches!(
job.status,
ExternalJobStatus::Pending | ExternalJobStatus::Polling
) {
self.poll_cycle(job).await;
}
}
async fn poll_cycle(&self, mut job: ExternalJob) {
if job.is_timed_out() {
job.status = ExternalJobStatus::TimedOut;
job.error = Some(format!(
"timed out after {}s",
chrono::Utc::now().timestamp() - job.submitted_at
));
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update failed: {e:#}");
}
self.attempt_delivery(&mut job).await;
return;
}
job.status = ExternalJobStatus::Polling;
job.poll_count += 1;
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (polling) failed: {e:#}");
return;
}
let outcome = self.dispatch_poll(&job).await;
match outcome {
Ok(PollOutcome::Pending) => {
let now = chrono::Utc::now().timestamp();
job.next_poll_at = now + job.next_poll_delay_secs() as i64;
job.status = ExternalJobStatus::Pending;
job.error = None;
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (pending) failed: {e:#}");
}
}
Ok(PollOutcome::Done(url)) => {
job.result_url = Some(url.clone());
match download_artifact(&self.client, &url, job.kind).await {
Ok(local_path) => {
job.result_path = Some(local_path);
job.status = ExternalJobStatus::Done;
job.error = None;
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (done) failed: {e:#}");
}
self.attempt_delivery(&mut job).await;
}
Err(e) => {
job.status = ExternalJobStatus::Failed;
job.error = Some(format!("download: {e:#}"));
if let Err(e2) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (download-fail) failed: {e2:#}");
}
self.attempt_delivery(&mut job).await;
}
}
}
Ok(PollOutcome::Failed(msg)) => {
job.status = ExternalJobStatus::Failed;
job.error = Some(msg);
if let Err(e) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (failed) failed: {e:#}");
}
self.attempt_delivery(&mut job).await;
}
Err(e) => {
let now = chrono::Utc::now().timestamp();
job.next_poll_at = now + job.next_poll_delay_secs() as i64;
job.status = ExternalJobStatus::Pending;
job.error = Some(format!("poll: {e:#}"));
warn!(job_id = %job.id, error = %e, "external jobs: transient poll error");
if let Err(e2) = self.store.update_external_job(&job) {
error!(job_id = %job.id, "update (transient) failed: {e2:#}");
}
}
}
}
async fn retry_delivery(&self, mut job: ExternalJob) {
info!(
job_id = %job.id,
attempts = job.delivery_attempts,
"external jobs: retrying delivery"
);
self.attempt_delivery(&mut job).await;
}
async fn attempt_delivery(&self, job: &mut ExternalJob) {
job.delivery_attempts = job.delivery_attempts.saturating_add(1);
let success = if matches!(job.status, ExternalJobStatus::Done) {
self.deliver_success(job).await
} else {
self.deliver_failure(job).await
};
if success {
job.delivered_at = Some(chrono::Utc::now().timestamp());
if let Err(e) = self.store.update_external_job(job) {
error!(job_id = %job.id, "update (delivered) failed: {e:#}");
}
if let Err(e) = self.write_back_to_session(job) {
debug!(job_id = %job.id, "session writeback skipped: {e:#}");
}
} else {
let now = chrono::Utc::now().timestamp();
job.next_poll_at = now + DELIVERY_RETRY_DELAY_SECS as i64;
warn!(
job_id = %job.id,
attempts = job.delivery_attempts,
"external jobs: delivery failed, will retry"
);
if let Err(e) = self.store.update_external_job(job) {
error!(job_id = %job.id, "update (delivery-retry) failed: {e:#}");
}
}
}
fn write_back_to_session(&self, job: &ExternalJob) -> Result<()> {
let kind_label = match job.kind {
ExternalJobKind::VideoGen => "video",
ExternalJobKind::ImageGen => "image",
};
let path = job.result_path.as_deref().unwrap_or("");
let content = if matches!(job.status, ExternalJobStatus::Done) {
format!("[{kind_label} generation complete] {path}")
} else {
format!(
"[{kind_label} generation {}] {}",
match job.status {
ExternalJobStatus::Failed => "failed",
ExternalJobStatus::TimedOut => "timed out",
_ => "ended",
},
job.error.as_deref().unwrap_or("")
)
};
let msg = json!({
"role": "assistant",
"content": content,
"external_job_id": job.id,
});
self.store
.append_message(&job.session_key, &msg)
.map(|_| ())
.map_err(|e| anyhow!("append_message: {e}"))
}
async fn dispatch_poll(&self, job: &ExternalJob) -> Result<PollOutcome> {
match job.provider.as_str() {
"seedance" => {
let key = self
.resolve_provider_key("doubao", "ARK_API_KEY")
.ok_or_else(|| anyhow!("seedance: no API key configured"))?;
poll_seedance(&self.client, &key, &job.external_task_id).await
}
"minimax" => {
let key = self
.resolve_provider_key("minimax", "MINIMAX_API_KEY")
.ok_or_else(|| anyhow!("minimax: no API key configured"))?;
poll_minimax(&self.client, &key, &job.external_task_id).await
}
"kling" => {
let (ak, sk) = self
.resolve_kling_keys()
.ok_or_else(|| anyhow!("kling: KLING_ACCESS_KEY / KLING_SECRET_KEY not configured"))?;
poll_kling(&self.client, &ak, &sk, &job.external_task_id).await
}
other => Err(anyhow!("no async polling adapter for provider: {other}")),
}
}
fn resolve_provider_key(&self, provider: &str, env_var: &str) -> Option<String> {
self.config
.model
.models
.as_ref()
.and_then(|m| m.providers.get(provider))
.and_then(|p| p.api_key.as_ref())
.and_then(|k| k.as_plain().map(str::to_owned))
.or_else(|| std::env::var(env_var).ok())
}
fn resolve_kling_keys(&self) -> Option<(String, String)> {
let ak = self.resolve_provider_key("kling", "KLING_ACCESS_KEY")?;
let sk = std::env::var("KLING_SECRET_KEY").ok()?;
Some((ak, sk))
}
async fn deliver_success(&self, job: &ExternalJob) -> bool {
let path = job.result_path.as_deref().unwrap_or("");
let kind_label = match job.kind {
ExternalJobKind::VideoGen => "video",
ExternalJobKind::ImageGen => "image",
};
let filename = std::path::Path::new(path)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or(path);
let mime = match job.kind {
ExternalJobKind::VideoGen => "video/mp4",
ExternalJobKind::ImageGen => "image/png",
};
let prompt_preview: String = job.prompt.chars().take(80).collect();
let out = OutboundMessage {
target_id: job.delivery.target_id.clone(),
is_group: job.delivery.is_group,
text: format!("[{kind_label}] {prompt_preview}"),
reply_to: job.delivery.reply_to.clone(),
images: vec![],
files: vec![(filename.to_string(), mime.to_string(), path.to_string())],
channel: Some(job.delivery.channel.clone()),
account: None,
};
match self.notification_tx.send(out) {
Ok(_) => true,
Err(e) => {
warn!(job_id = %job.id, "deliver_success: notification_tx failed: {e}");
false
}
}
}
async fn deliver_failure(&self, job: &ExternalJob) -> bool {
let kind_label = match job.kind {
ExternalJobKind::VideoGen => "video",
ExternalJobKind::ImageGen => "image",
};
let reason = job.error.as_deref().unwrap_or("unknown error");
let prompt_preview: String = job.prompt.chars().take(80).collect();
let out = OutboundMessage {
target_id: job.delivery.target_id.clone(),
is_group: job.delivery.is_group,
text: format!("[{kind_label}] generation failed for: {prompt_preview}\n{reason}"),
reply_to: job.delivery.reply_to.clone(),
images: vec![],
files: vec![],
channel: Some(job.delivery.channel.clone()),
account: None,
};
match self.notification_tx.send(out) {
Ok(_) => true,
Err(e) => {
warn!(job_id = %job.id, "deliver_failure: notification_tx failed: {e}");
false
}
}
}
}
const SEEDANCE_BASE: &str = "https://ark.cn-beijing.volces.com/api/v3";
const SEEDANCE_DEFAULT_MODEL: &str = "doubao-seedance-2-0-260128";
pub async fn submit_seedance(
client: &reqwest::Client,
api_key: &str,
prompt: &str,
duration: u64,
aspect_ratio: &str,
model_override: Option<&str>,
) -> Result<String> {
let model = model_override.unwrap_or(SEEDANCE_DEFAULT_MODEL);
let body = json!({
"model": model,
"content": [{"type": "text", "text": prompt}],
"ratio": aspect_ratio,
"duration": duration,
"watermark": false,
});
let resp: serde_json::Value = client
.post(format!("{SEEDANCE_BASE}/contents/generations/tasks"))
.bearer_auth(api_key)
.json(&body)
.send()
.await
.map_err(|e| anyhow!("seedance: submit failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("seedance: submit parse failed: {e}"))?;
let task_id = resp["id"]
.as_str()
.ok_or_else(|| anyhow!("seedance: no task id in response: {resp}"))?
.to_owned();
Ok(task_id)
}
async fn poll_seedance(
client: &reqwest::Client,
api_key: &str,
task_id: &str,
) -> Result<PollOutcome> {
let resp: serde_json::Value = client
.get(format!("{SEEDANCE_BASE}/contents/generations/tasks/{task_id}"))
.bearer_auth(api_key)
.send()
.await
.map_err(|e| anyhow!("seedance: poll failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("seedance: poll parse failed: {e}"))?;
let status = resp["status"].as_str().unwrap_or("unknown");
match status {
"succeeded" => {
let url = resp
.pointer("/content/video_url")
.or_else(|| resp.pointer("/content/0/video_url/url"))
.or_else(|| resp.pointer("/content/0/url"))
.or_else(|| resp.pointer("/result/video_url/url"))
.or_else(|| resp.pointer("/output/url"))
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("seedance: no video URL in result: {resp}"))?
.to_owned();
Ok(PollOutcome::Done(url))
}
"failed" | "cancelled" => {
let msg = resp["error"]["message"]
.as_str()
.or_else(|| resp["message"].as_str())
.unwrap_or("task failed");
Ok(PollOutcome::Failed(format!("{status}: {msg}")))
}
_ => Ok(PollOutcome::Pending),
}
}
const MINIMAX_BASE: &str = "https://api.minimaxi.com/v1";
const MINIMAX_DEFAULT_MODEL: &str = "video-01-director";
fn minimax_resolution(aspect_ratio: &str) -> &'static str {
match aspect_ratio {
"9:16" => "720x1280",
"1:1" => "720x720",
_ => "1280x720",
}
}
pub async fn submit_minimax(
client: &reqwest::Client,
api_key: &str,
prompt: &str,
duration: u64,
aspect_ratio: &str,
model_override: Option<&str>,
) -> Result<String> {
let model = model_override.unwrap_or(MINIMAX_DEFAULT_MODEL);
let resp: serde_json::Value = client
.post(format!("{MINIMAX_BASE}/video_generation"))
.bearer_auth(api_key)
.json(&json!({
"prompt": prompt,
"model": model,
"duration": duration,
"resolution": minimax_resolution(aspect_ratio),
}))
.send()
.await
.map_err(|e| anyhow!("minimax: submit failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("minimax: submit parse failed: {e}"))?;
let task_id = resp["task_id"]
.as_str()
.ok_or_else(|| anyhow!("minimax: no task_id in response: {resp}"))?
.to_owned();
Ok(task_id)
}
async fn poll_minimax(
client: &reqwest::Client,
api_key: &str,
task_id: &str,
) -> Result<PollOutcome> {
let poll: serde_json::Value = client
.get(format!("{MINIMAX_BASE}/query/video_generation"))
.bearer_auth(api_key)
.query(&[("task_id", task_id)])
.send()
.await
.map_err(|e| anyhow!("minimax: poll failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("minimax: poll parse failed: {e}"))?;
let status = poll
.pointer("/task/status")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
match status {
"Success" => {
let file_id = poll
.pointer("/task/file_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("minimax: no file_id in result: {poll}"))?
.to_owned();
let file_resp: serde_json::Value = client
.get(format!("{MINIMAX_BASE}/files/retrieve"))
.bearer_auth(api_key)
.query(&[("file_id", file_id.as_str())])
.send()
.await
.map_err(|e| anyhow!("minimax: file retrieve failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("minimax: file retrieve parse failed: {e}"))?;
let url = file_resp
.pointer("/file/download_url")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("minimax: no download_url: {file_resp}"))?
.to_owned();
Ok(PollOutcome::Done(url))
}
"Fail" => Ok(PollOutcome::Failed(format!("minimax task {task_id} failed"))),
_ => Ok(PollOutcome::Pending),
}
}
const KLING_BASE: &str = "https://api.klingai.com";
const KLING_DEFAULT_MODEL: &str = "kling-v2-master";
pub async fn submit_kling(
client: &reqwest::Client,
access_key: &str,
secret_key: &str,
prompt: &str,
duration: u64,
aspect_ratio: &str,
model_override: Option<&str>,
) -> Result<String> {
let model = model_override.unwrap_or(KLING_DEFAULT_MODEL);
let jwt = kling_jwt(access_key, secret_key)?;
let resp: serde_json::Value = client
.post(format!("{KLING_BASE}/v1/videos/text2video"))
.bearer_auth(&jwt)
.json(&json!({
"model_name": model,
"prompt": prompt,
"duration": duration.to_string(),
"aspect_ratio": aspect_ratio,
}))
.send()
.await
.map_err(|e| anyhow!("kling: submit failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("kling: submit parse failed: {e}"))?;
let task_id = resp
.pointer("/data/task_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("kling: no task_id in response: {resp}"))?
.to_owned();
Ok(task_id)
}
async fn poll_kling(
client: &reqwest::Client,
access_key: &str,
secret_key: &str,
task_id: &str,
) -> Result<PollOutcome> {
let jwt = kling_jwt(access_key, secret_key)?;
let poll: serde_json::Value = client
.get(format!("{KLING_BASE}/v1/videos/text2video/{task_id}"))
.bearer_auth(&jwt)
.send()
.await
.map_err(|e| anyhow!("kling: poll failed: {e}"))?
.json()
.await
.map_err(|e| anyhow!("kling: poll parse failed: {e}"))?;
let status = poll
.pointer("/data/task_status")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
match status {
"succeed" => {
let url = poll
.pointer("/data/task_result/videos/0/url")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("kling: no video URL in result: {poll}"))?
.to_owned();
Ok(PollOutcome::Done(url))
}
"failed" => {
let msg = poll
.pointer("/data/task_status_msg")
.and_then(|v| v.as_str())
.unwrap_or("task failed");
Ok(PollOutcome::Failed(format!("kling: {msg}")))
}
_ => Ok(PollOutcome::Pending),
}
}
fn kling_jwt(access_key: &str, secret_key: &str) -> Result<String> {
use base64::Engine;
use hmac::{Hmac, Mac};
use sha2::Sha256;
let now = chrono::Utc::now().timestamp();
let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode(r#"{"alg":"HS256","typ":"JWT"}"#);
let payload_json = format!(
r#"{{"iss":"{access_key}","exp":{},"nbf":{}}}"#,
now + 1800,
now - 5
);
let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&payload_json);
let signing_input = format!("{header}.{payload}");
let mut mac = Hmac::<Sha256>::new_from_slice(secret_key.as_bytes())
.map_err(|e| anyhow!("kling_jwt: invalid key: {e}"))?;
mac.update(signing_input.as_bytes());
let sig = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(mac.finalize().into_bytes());
Ok(format!("{signing_input}.{sig}"))
}
async fn download_artifact(
client: &reqwest::Client,
url: &str,
kind: ExternalJobKind,
) -> Result<String> {
let bytes = client
.get(url)
.timeout(Duration::from_secs(120))
.send()
.await
.map_err(|e| anyhow!("download: {e}"))?
.bytes()
.await
.map_err(|e| anyhow!("download read: {e}"))?;
let ext = match kind {
ExternalJobKind::VideoGen => "mp4",
ExternalJobKind::ImageGen => "png",
};
let kind_letter = crate::channel::kind_from_extension(ext);
let category = crate::channel::category_for_kind(kind_letter);
let dir = dirs_next::download_dir()
.unwrap_or_else(|| {
dirs_next::home_dir()
.unwrap_or_else(crate::config::loader::base_dir)
.join("Downloads")
})
.join("rsclaw")
.join(category);
tokio::fs::create_dir_all(&dir)
.await
.map_err(|e| anyhow!("download: create_dir: {e}"))?;
let ts = chrono::Local::now().format("%Y%m%d%H%M").to_string();
let abc: String = (0..3)
.map(|_| (rand::random::<u8>() % 26 + b'a') as char)
.collect();
let path = dir.join(format!("dl_{kind_letter}_{ts}{abc}.{ext}"));
tokio::fs::write(&path, &bytes)
.await
.map_err(|e| anyhow!("download: write: {e}"))?;
Ok(path.to_string_lossy().to_string())
}