rsclaw-runtime 2026.6.26

rsclaw composition root: AppState/RPC handlers (a2a, cmd, cron, gateway, hooks, server, ws) + process entry point
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
//! Background worker that drives `ExternalJob` rows to completion and
//! delivers the resulting artifact through the standard channel
//! notification path.
//!
//! Design lives in `external_jobs.rs`. This file holds the runtime loop
//! plus per-provider HTTP adapters (`submit_*` and internal `poll_*`).
//! Adding a new async provider means: extend the `match` in
//! `dispatch_poll`, add the corresponding `submit_*` for the tool side,
//! and add the URL → Done outcome mapping.

use std::{sync::Arc, time::Duration};

use anyhow::{Result, anyhow};
use serde_json::json;
use tokio::sync::{Semaphore, broadcast};
use tracing::{debug, error, info, warn};

use super::{
    external_jobs::{ExternalJob, ExternalJobKind, ExternalJobStatus, PollOutcome},
    shutdown::ShutdownCoordinator,
};
use rsclaw_channel::OutboundMessage;
use rsclaw_config::runtime::RuntimeConfig;
use rsclaw_store::RedbStore;

/// Seconds between worker ticks when nothing is due — small enough that
/// new jobs start polling promptly, large enough to keep redb scans cheap.
const TICK_SECS: u64 = 5;

/// Retention window for terminal jobs before they get GC'd.
const FINISHED_RETENTION_SECS: i64 = 24 * 3600;

/// Concurrent in-flight provider HTTP calls (poll + delivery). Caps the
/// thundering-herd risk after a long restart window when many jobs become
/// due simultaneously. Provider rate limits typically tolerate this.
const MAX_CONCURRENT_OPS: usize = 8;

/// Back-off seconds between failed delivery attempts. Constant for now —
/// `notification_tx.send` failures are nearly always "no live receivers"
/// during a brief startup gap, not provider issues that need exponential
/// back-off.
const DELIVERY_RETRY_DELAY_SECS: u64 = 30;

pub struct ExternalJobsWorker {
    store: Arc<RedbStore>,
    notification_tx: broadcast::Sender<OutboundMessage>,
    shutdown: ShutdownCoordinator,
    config: Arc<RuntimeConfig>,
    client: reqwest::Client,
    /// Cap on concurrent per-job operations — see MAX_CONCURRENT_OPS.
    op_semaphore: Arc<Semaphore>,
}

impl ExternalJobsWorker {
    pub fn new(
        store: Arc<RedbStore>,
        notification_tx: broadcast::Sender<OutboundMessage>,
        shutdown: ShutdownCoordinator,
        config: Arc<RuntimeConfig>,
    ) -> Self {
        let client = reqwest::Client::builder()
            .timeout(Duration::from_secs(30))
            .build()
            .unwrap_or_default();
        Self {
            store,
            notification_tx,
            shutdown,
            config,
            client,
            op_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_OPS)),
        }
    }

    /// Main loop. Each tick: list due jobs, poll each in a spawned task,
    /// GC finished rows.
    pub async fn run(self: Arc<Self>) {
        info!("external jobs worker started");
        let mut gc_counter: u32 = 0;
        loop {
            if self.shutdown.is_draining() {
                info!("external jobs worker: drain signaled, stopping");
                break;
            }
            let now = chrono::Utc::now().timestamp();
            match self.store.due_external_jobs(now) {
                Ok(jobs) if !jobs.is_empty() => {
                    debug!(count = jobs.len(), "external jobs: due tick");
                    for job in jobs {
                        let worker = Arc::clone(&self);
                        let guard = self.shutdown.begin_work();
                        let sem = Arc::clone(&self.op_semaphore);
                        tokio::spawn(async move {
                            // acquire_owned awaits a permit so concurrent
                            // ops are bounded by MAX_CONCURRENT_OPS even if
                            // 100 jobs become due in the same tick.
                            let _permit = match sem.acquire_owned().await {
                                Ok(p) => p,
                                Err(_) => {
                                    drop(guard);
                                    return;
                                }
                            };
                            worker.process_job(job).await;
                            drop(guard);
                        });
                    }
                }
                Ok(_) => {}
                Err(e) => error!("external jobs: due query failed: {e:#}"),
            }

            // GC every ~12 ticks (~1 minute) — terminal rows older than the
            // retention window get dropped.
            gc_counter = gc_counter.wrapping_add(1);
            if gc_counter % 12 == 0 {
                if let Err(e) = self
                    .store
                    .cleanup_finished_external_jobs(FINISHED_RETENTION_SECS)
                {
                    warn!("external jobs: cleanup_finished failed: {e:#}");
                }
            }

            tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
        }
        info!("external jobs worker exited");
    }

    /// Decide whether the job needs a polling cycle (still in flight) or
    /// a delivery retry (already terminal but `notification_tx.send`
    /// failed last time), then dispatch.
    async fn process_job(&self, job: ExternalJob) {
        if job.needs_delivery() {
            self.retry_delivery(job).await;
        } else if matches!(
            job.status,
            ExternalJobStatus::Pending | ExternalJobStatus::Polling
        ) {
            self.poll_cycle(job).await;
        }
        // Else: row is in a state we shouldn't be acting on (e.g. already
        // delivered terminal). due_external_jobs already filtered, so
        // hitting this branch means a concurrent state change — skip.
    }

    /// Full poll cycle: timeout sweep → poll → on terminal, attempt first
    /// delivery. If delivery fails, the row stays in a terminal status
    /// with `delivered_at = None` and gets picked up by `retry_delivery`
    /// on later ticks.
    async fn poll_cycle(&self, mut job: ExternalJob) {
        if job.is_timed_out() {
            job.status = ExternalJobStatus::TimedOut;
            job.error = Some(format!(
                "timed out after {}s",
                chrono::Utc::now().timestamp() - job.submitted_at
            ));
            if let Err(e) = self.store.update_external_job(&job) {
                error!(job_id = %job.id, "update failed: {e:#}");
            }
            self.attempt_delivery(&mut job).await;
            return;
        }

        // Mark `Polling` so a concurrent restart sweep sees it as in-flight.
        job.status = ExternalJobStatus::Polling;
        job.poll_count += 1;
        if let Err(e) = self.store.update_external_job(&job) {
            error!(job_id = %job.id, "update (polling) failed: {e:#}");
            return;
        }

        let outcome = self.dispatch_poll(&job).await;
        match outcome {
            Ok(PollOutcome::Pending) => {
                let now = chrono::Utc::now().timestamp();
                job.next_poll_at = now + job.next_poll_delay_secs() as i64;
                job.status = ExternalJobStatus::Pending;
                job.error = None;
                if let Err(e) = self.store.update_external_job(&job) {
                    error!(job_id = %job.id, "update (pending) failed: {e:#}");
                }
            }
            Ok(PollOutcome::Done(url)) => {
                job.result_url = Some(url.clone());
                // rsclaw VIDEO artifacts are served inline behind Bearer at
                // `/v1/videos/{id}/content` (no authless R2 presign), so they
                // must be downloaded WITH the rsclaw key (following the LB hop).
                // Everything else (agnes / openai / seedance public URLs, and
                // the rsclaw_image signed url) is fetched authless.
                let dl = if job.provider == "rsclaw" {
                    let key = self
                        .resolve_provider_key("rsclaw", "RSCLAW_API_KEY")
                        .unwrap_or_default();
                    rsclaw_jobs::download_artifact_authed(&url, &key, job.kind).await
                } else {
                    rsclaw_jobs::download_artifact(&self.client, &url, job.kind).await
                };
                match dl {
                    Ok(local_path) => {
                        job.result_path = Some(local_path);
                        job.status = ExternalJobStatus::Done;
                        job.error = None;
                        if let Err(e) = self.store.update_external_job(&job) {
                            error!(job_id = %job.id, "update (done) failed: {e:#}");
                        }
                        self.attempt_delivery(&mut job).await;
                    }
                    Err(e) => {
                        job.status = ExternalJobStatus::Failed;
                        job.error = Some(format!("download: {e:#}"));
                        if let Err(e2) = self.store.update_external_job(&job) {
                            error!(job_id = %job.id, "update (download-fail) failed: {e2:#}");
                        }
                        self.attempt_delivery(&mut job).await;
                    }
                }
            }
            Ok(PollOutcome::Failed(msg)) => {
                job.status = ExternalJobStatus::Failed;
                job.error = Some(msg);
                if let Err(e) = self.store.update_external_job(&job) {
                    error!(job_id = %job.id, "update (failed) failed: {e:#}");
                }
                self.attempt_delivery(&mut job).await;
            }
            Err(e) => {
                // Transient error — schedule next poll, keep job alive.
                let now = chrono::Utc::now().timestamp();
                job.next_poll_at = now + job.next_poll_delay_secs() as i64;
                job.status = ExternalJobStatus::Pending;
                job.error = Some(format!("poll: {e:#}"));
                warn!(job_id = %job.id, error = %e, "external jobs: transient poll error");
                if let Err(e2) = self.store.update_external_job(&job) {
                    error!(job_id = %job.id, "update (transient) failed: {e2:#}");
                }
            }
        }
    }

    /// Re-attempt delivery for a terminal job whose previous delivery
    /// attempt failed. Reuses the same `attempt_delivery` path so the
    /// success / failure bookkeeping is identical.
    async fn retry_delivery(&self, mut job: ExternalJob) {
        info!(
            job_id = %job.id,
            attempts = job.delivery_attempts,
            "external jobs: retrying delivery"
        );
        self.attempt_delivery(&mut job).await;
    }

    /// Try to push the artifact (or failure notice) through
    /// `notification_tx`. On success stamp `delivered_at` and write the
    /// outcome to session history; on failure schedule a retry by
    /// bumping `next_poll_at`.
    async fn attempt_delivery(&self, job: &mut ExternalJob) {
        job.delivery_attempts = job.delivery_attempts.saturating_add(1);
        let success = if matches!(job.status, ExternalJobStatus::Done) {
            self.deliver_success(job).await
        } else {
            self.deliver_failure(job).await
        };

        if success {
            job.delivered_at = Some(chrono::Utc::now().timestamp());
            if let Err(e) = self.store.update_external_job(job) {
                error!(job_id = %job.id, "update (delivered) failed: {e:#}");
            }
            // Best-effort session-history writeback so the agent sees the
            // result on the next turn ("hey, your video from earlier...").
            // Failures here don't affect the user's actual delivery.
            if let Err(e) = self.write_back_to_session(job) {
                debug!(job_id = %job.id, "session writeback skipped: {e:#}");
            }
        } else {
            let now = chrono::Utc::now().timestamp();
            job.next_poll_at = now + DELIVERY_RETRY_DELAY_SECS as i64;
            warn!(
                job_id = %job.id,
                attempts = job.delivery_attempts,
                "external jobs: delivery failed, will retry"
            );
            if let Err(e) = self.store.update_external_job(job) {
                error!(job_id = %job.id, "update (delivery-retry) failed: {e:#}");
            }
        }
    }

    /// Append a synthetic assistant message to the original session so the
    /// agent's next turn knows the artifact landed. Best-effort — the
    /// channel-level delivery is the source of truth for the user.
    fn write_back_to_session(&self, job: &ExternalJob) -> Result<()> {
        let kind_label = match job.kind {
            ExternalJobKind::VideoGen => "video",
            ExternalJobKind::ImageGen => "image",
        };
        let path = job.result_path.as_deref().unwrap_or("");
        let content = if matches!(job.status, ExternalJobStatus::Done) {
            format!("[{kind_label} generation complete] {path}")
        } else {
            format!(
                "[{kind_label} generation {}] {}",
                match job.status {
                    ExternalJobStatus::Failed => "failed",
                    ExternalJobStatus::TimedOut => "timed out",
                    _ => "ended",
                },
                job.error.as_deref().unwrap_or("")
            )
        };
        let msg = json!({
            "role": "assistant",
            "content": content,
            "external_job_id": job.id,
        });
        self.store
            .append_message(&job.session_key, &msg)
            .map(|_| ())
            .map_err(|e| anyhow!("append_message: {e}"))
    }

    /// Pick the right per-provider polling adapter.
    async fn dispatch_poll(&self, job: &ExternalJob) -> Result<PollOutcome> {
        match job.provider.as_str() {
            "seedance" => {
                let key = self
                    .resolve_provider_key("doubao", "ARK_API_KEY")
                    .ok_or_else(|| anyhow!("seedance: no API key configured"))?;
                rsclaw_jobs::poll_seedance(&self.client, &key, &job.external_task_id).await
            }
            "minimax" => {
                let key = self
                    .resolve_provider_key("minimax", "MINIMAX_API_KEY")
                    .ok_or_else(|| anyhow!("minimax: no API key configured"))?;
                rsclaw_jobs::poll_minimax(&self.client, &key, &job.external_task_id).await
            }
            "kling" => {
                let (ak, sk) = self.resolve_kling_keys().ok_or_else(|| {
                    anyhow!("kling: KLING_ACCESS_KEY / KLING_SECRET_KEY not configured")
                })?;
                rsclaw_jobs::poll_kling(&self.client, &ak, &sk, &job.external_task_id).await
            }
            "rsclaw" => {
                let key = self
                    .resolve_provider_key("rsclaw", "RSCLAW_API_KEY")
                    .ok_or_else(|| anyhow!("rsclaw: no API key configured"))?;
                rsclaw_jobs::poll_rsclaw(&key, &job.external_task_id).await
            }
            "agnes" => {
                let key = self
                    .resolve_provider_key("agnes", "AGNES_API_KEY")
                    .ok_or_else(|| anyhow!("agnes: no API key configured"))?;
                rsclaw_jobs::poll_agnes(&self.client, &key, &job.external_task_id).await
            }
            "openai" => {
                let key = self
                    .resolve_provider_key("openai", "OPENAI_API_KEY")
                    .ok_or_else(|| anyhow!("openai: no API key configured"))?;
                let base = self.resolve_provider_base_url("openai");
                rsclaw_jobs::poll_openai_video(&self.client, &base, &key, &job.external_task_id)
                    .await
            }
            // Async rsclaw image (image-edit / t2i-v2). external_task_id is the
            // signed, authless poll URL; no provider key needed.
            "rsclaw_image" => {
                rsclaw_jobs::poll_rsclaw_image(&self.client, &job.external_task_id).await
            }
            other => Err(anyhow!("no async polling adapter for provider: {other}")),
        }
    }

    /// Resolve a provider's configured `base_url`, falling back to the builtin
    /// default for that provider (e.g. openai → https://api.openai.com/v1).
    fn resolve_provider_base_url(&self, provider: &str) -> String {
        self.config
            .model
            .models
            .as_ref()
            .and_then(|m| m.providers.get(provider))
            .and_then(|p| p.base_url.clone())
            .unwrap_or_else(|| rsclaw_provider::defaults::resolve_base_url(provider).0)
    }

    /// Resolve a single-bearer-token provider key from rsclaw.json5
    /// (`models.providers.<name>.api_key`) with env-var fallback.
    fn resolve_provider_key(&self, provider: &str, env_var: &str) -> Option<String> {
        self.config
            .model
            .models
            .as_ref()
            .and_then(|m| m.providers.get(provider))
            .and_then(|p| p.api_key.as_ref())
            .and_then(|k| k.as_plain().map(str::to_owned))
            .or_else(|| std::env::var(env_var).ok())
    }

    /// Resolve the Kling access/secret pair. Both must be present, otherwise
    /// `None` so the caller can fail loudly rather than build a bad JWT.
    fn resolve_kling_keys(&self) -> Option<(String, String)> {
        let ak = self.resolve_provider_key("kling", "KLING_ACCESS_KEY")?;
        let sk = std::env::var("KLING_SECRET_KEY").ok()?;
        Some((ak, sk))
    }

    /// Returns true iff the broadcast accepted the message. The caller
    /// (`attempt_delivery`) uses the boolean to decide whether to stamp
    /// `delivered_at` or schedule another retry tick.
    async fn deliver_success(&self, job: &ExternalJob) -> bool {
        let path = job.result_path.as_deref().unwrap_or("");
        let kind_label = match job.kind {
            ExternalJobKind::VideoGen => "video",
            ExternalJobKind::ImageGen => "image",
        };
        let filename = std::path::Path::new(path)
            .file_name()
            .and_then(|s| s.to_str())
            .unwrap_or(path);
        let mime = match job.kind {
            ExternalJobKind::VideoGen => "video/mp4",
            ExternalJobKind::ImageGen => "image/png",
        };
        let prompt_preview: String = job.prompt.chars().take(80).collect();
        let out = OutboundMessage {
            target_id: job.delivery.target_id.clone(),
            is_group: job.delivery.is_group,
            text: format!("[{kind_label}] {prompt_preview}"),
            reply_to: job.delivery.reply_to.clone(),
            images: vec![],
            files: vec![(filename.to_string(), mime.to_string(), path.to_string())],
            channel: Some(job.delivery.channel.clone()),
            account: job.delivery.account.clone(),
        };
        match self.notification_tx.send(out) {
            Ok(_) => true,
            Err(e) => {
                warn!(job_id = %job.id, "deliver_success: notification_tx failed: {e}");
                false
            }
        }
    }

    async fn deliver_failure(&self, job: &ExternalJob) -> bool {
        let kind_label = match job.kind {
            ExternalJobKind::VideoGen => "video",
            ExternalJobKind::ImageGen => "image",
        };
        let reason = job.error.as_deref().unwrap_or("unknown error");
        let prompt_preview: String = job.prompt.chars().take(80).collect();
        let text = format!("[{kind_label} failed] {prompt_preview}: {reason}");
        let out = OutboundMessage {
            target_id: job.delivery.target_id.clone(),
            is_group: job.delivery.is_group,
            text,
            reply_to: job.delivery.reply_to.clone(),
            images: vec![],
            files: vec![],
            channel: Some(job.delivery.channel.clone()),
            account: job.delivery.account.clone(),
        };
        match self.notification_tx.send(out) {
            Ok(_) => true,
            Err(e) => {
                warn!(job_id = %job.id, "deliver_failure: notification_tx failed: {e}");
                false
            }
        }
    }
}