Skip to main content

forge_core/cluster/
roles.rs

1use std::str::FromStr;
2
3/// Node role in the cluster.
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
5#[non_exhaustive]
6pub enum NodeRole {
7    /// HTTP gateway for client requests.
8    Gateway,
9    /// Function executor.
10    Function,
11    /// Background job worker.
12    Worker,
13    /// Scheduler (leader-only) for crons and job assignment.
14    Scheduler,
15}
16
17impl NodeRole {
18    pub fn as_str(&self) -> &'static str {
19        match self {
20            Self::Gateway => "gateway",
21            Self::Function => "function",
22            Self::Worker => "worker",
23            Self::Scheduler => "scheduler",
24        }
25    }
26
27    pub fn all() -> Vec<Self> {
28        vec![Self::Gateway, Self::Function, Self::Worker, Self::Scheduler]
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ParseNodeRoleError(pub String);
34
35impl std::fmt::Display for ParseNodeRoleError {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        write!(f, "invalid node role: {}", self.0)
38    }
39}
40
41impl std::error::Error for ParseNodeRoleError {}
42
43impl FromStr for NodeRole {
44    type Err = ParseNodeRoleError;
45
46    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
47        match s {
48            "gateway" => Ok(Self::Gateway),
49            "function" => Ok(Self::Function),
50            "worker" => Ok(Self::Worker),
51            "scheduler" => Ok(Self::Scheduler),
52            _ => Err(ParseNodeRoleError(s.to_string())),
53        }
54    }
55}
56
57impl std::fmt::Display for NodeRole {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        write!(f, "{}", self.as_str())
60    }
61}
62
63/// Leader role for coordinated operations.
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65#[non_exhaustive]
66pub enum LeaderRole {
67    /// Job assignment and cron triggering.
68    Scheduler,
69    /// Metrics aggregation.
70    MetricsAggregator,
71    /// Log compaction.
72    LogCompactor,
73    /// Leader-elected daemon instance.
74    Daemon(String),
75}
76
77impl LeaderRole {
78    /// Advisory lock ID. Fixed slots for named roles; FNV-1a in the FORGE
79    /// daemon namespace (0x464F_5247_4000) for `Daemon` variants.
80    pub fn lock_id(&self) -> i64 {
81        match self {
82            Self::Scheduler => 0x464F_5247_0001,
83            Self::MetricsAggregator => 0x464F_5247_0002,
84            Self::LogCompactor => 0x464F_5247_0003,
85            Self::Daemon(name) => {
86                const FNV_OFFSET: i64 = 0x464F_5247_4000;
87                const FNV_PRIME: i64 = 0x0100_0000_01b3; // 1099511628211
88                let mut h: i64 = FNV_OFFSET;
89                for b in name.bytes() {
90                    h ^= b as i64;
91                    h = h.wrapping_mul(FNV_PRIME);
92                }
93                h
94            }
95        }
96    }
97
98    pub fn as_str(&self) -> &str {
99        match self {
100            Self::Scheduler => "scheduler",
101            Self::MetricsAggregator => "metrics_aggregator",
102            Self::LogCompactor => "log_compactor",
103            Self::Daemon(name) => name.as_str(),
104        }
105    }
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct ParseLeaderRoleError(pub String);
110
111impl std::fmt::Display for ParseLeaderRoleError {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "invalid leader role: {}", self.0)
114    }
115}
116
117impl std::error::Error for ParseLeaderRoleError {}
118
119impl FromStr for LeaderRole {
120    type Err = ParseLeaderRoleError;
121
122    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
123        match s {
124            "scheduler" => Ok(Self::Scheduler),
125            "metrics_aggregator" => Ok(Self::MetricsAggregator),
126            "log_compactor" => Ok(Self::LogCompactor),
127            other => Ok(Self::Daemon(other.to_string())),
128        }
129    }
130}
131
132#[cfg(test)]
133#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn test_node_role_conversion() {
139        assert_eq!("gateway".parse::<NodeRole>(), Ok(NodeRole::Gateway));
140        assert_eq!("worker".parse::<NodeRole>(), Ok(NodeRole::Worker));
141        assert!("invalid".parse::<NodeRole>().is_err());
142        assert_eq!(NodeRole::Gateway.as_str(), "gateway");
143    }
144
145    #[test]
146    fn test_all_roles() {
147        let roles = NodeRole::all();
148        assert_eq!(roles.len(), 4);
149        assert!(roles.contains(&NodeRole::Gateway));
150        assert!(roles.contains(&NodeRole::Scheduler));
151    }
152
153    #[test]
154    fn test_leader_role_lock_ids() {
155        // Each leader role should have a unique lock ID
156        let scheduler_id = LeaderRole::Scheduler.lock_id();
157        let metrics_id = LeaderRole::MetricsAggregator.lock_id();
158        let log_id = LeaderRole::LogCompactor.lock_id();
159
160        assert_ne!(scheduler_id, metrics_id);
161        assert_ne!(metrics_id, log_id);
162        assert_ne!(scheduler_id, log_id);
163    }
164
165    #[test]
166    fn test_leader_role_conversion() {
167        assert_eq!("scheduler".parse::<LeaderRole>(), Ok(LeaderRole::Scheduler));
168        assert_eq!(
169            "my_daemon".parse::<LeaderRole>(),
170            Ok(LeaderRole::Daemon("my_daemon".to_string()))
171        );
172        assert_eq!(LeaderRole::Scheduler.as_str(), "scheduler");
173        assert_eq!(
174            LeaderRole::Daemon("my_daemon".to_string()).as_str(),
175            "my_daemon"
176        );
177    }
178
179    #[test]
180    fn test_daemon_lock_ids_are_unique_and_stable() {
181        let a = LeaderRole::Daemon("daemon_a".to_string()).lock_id();
182        let b = LeaderRole::Daemon("daemon_b".to_string()).lock_id();
183        assert_ne!(a, b);
184        assert_eq!(a, LeaderRole::Daemon("daemon_a".to_string()).lock_id());
185
186        assert_ne!(a, LeaderRole::Scheduler.lock_id());
187        assert_ne!(a, LeaderRole::MetricsAggregator.lock_id());
188        assert_ne!(a, LeaderRole::LogCompactor.lock_id());
189    }
190}