Skip to main content

forge_core/config/
worker.rs

1//! Worker (job queue) configuration.
2
3use std::collections::BTreeMap;
4use std::time::Duration;
5
6use serde::{Deserialize, Serialize};
7
8use super::types::DurationStr;
9
10/// Worker configuration.
11///
12/// Worker pools are reserved per queue so heavy traffic on one queue cannot
13/// starve another. Defaults: `default=8`, `workflows=4`, `cron=2`. Operators
14/// override via `[worker.queues.<name>]` in `forge.toml`.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16#[non_exhaustive]
17pub struct WorkerConfig {
18    /// Job timeout duration (e.g. "1h", "30m").
19    #[serde(default = "default_job_timeout")]
20    pub job_timeout: DurationStr,
21
22    /// Poll interval duration (e.g. "100ms", "1s"). Wakeups are NOTIFY-driven;
23    /// this is the fallback cadence when no `forge_jobs_available` arrives.
24    #[serde(default = "default_poll_interval")]
25    pub poll_interval: DurationStr,
26
27    /// Per-queue worker pool reservations. The `default` queue handles
28    /// untagged user jobs; `workflows` handles `$workflow_resume`; `cron`
29    /// handles `$cron:*`. Add custom queues by tagging jobs with a worker
30    /// capability and configuring a matching entry here.
31    #[serde(default = "default_queues")]
32    pub queues: BTreeMap<String, QueueWorkerConfig>,
33}
34
35/// Per-queue worker pool reservation.
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37#[non_exhaustive]
38pub struct QueueWorkerConfig {
39    /// Number of concurrent worker tasks for this queue.
40    pub workers: usize,
41}
42
43impl QueueWorkerConfig {
44    /// Construct a queue config with the given worker count.
45    pub const fn new(workers: usize) -> Self {
46        Self { workers }
47    }
48}
49
50impl Default for WorkerConfig {
51    fn default() -> Self {
52        Self {
53            job_timeout: default_job_timeout(),
54            poll_interval: default_poll_interval(),
55            queues: default_queues(),
56        }
57    }
58}
59
60/// Queue name reserved for the `$workflow_resume` job kind.
61pub const WORKFLOWS_QUEUE: &str = "workflows";
62
63/// Queue name reserved for `$cron:<name>` jobs.
64pub const CRON_QUEUE: &str = "cron";
65
66/// Queue name that drains untagged user jobs.
67pub const DEFAULT_QUEUE: &str = "default";
68
69fn default_queues() -> BTreeMap<String, QueueWorkerConfig> {
70    let mut q = BTreeMap::new();
71    q.insert(DEFAULT_QUEUE.to_string(), QueueWorkerConfig::new(8));
72    q.insert(WORKFLOWS_QUEUE.to_string(), QueueWorkerConfig::new(4));
73    q.insert(CRON_QUEUE.to_string(), QueueWorkerConfig::new(2));
74    q
75}
76
77fn default_job_timeout() -> DurationStr {
78    DurationStr::new(Duration::from_secs(3600))
79}
80
81fn default_poll_interval() -> DurationStr {
82    DurationStr::new(Duration::from_millis(100))
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88
89    #[test]
90    fn default_queues_match_plan() {
91        let cfg = WorkerConfig::default();
92        assert_eq!(
93            cfg.queues.get(DEFAULT_QUEUE),
94            Some(&QueueWorkerConfig::new(8))
95        );
96        assert_eq!(
97            cfg.queues.get(WORKFLOWS_QUEUE),
98            Some(&QueueWorkerConfig::new(4))
99        );
100        assert_eq!(cfg.queues.get(CRON_QUEUE), Some(&QueueWorkerConfig::new(2)));
101    }
102
103    #[test]
104    fn deserialize_queue_overrides() {
105        let toml_str = r#"
106            [queues.default]
107            workers = 16
108
109            [queues.media]
110            workers = 3
111        "#;
112        let cfg: WorkerConfig = toml::from_str(toml_str).expect("parse");
113        assert_eq!(cfg.queues.get("default"), Some(&QueueWorkerConfig::new(16)));
114        assert_eq!(cfg.queues.get("media"), Some(&QueueWorkerConfig::new(3)));
115    }
116}