Skip to main content

forge_core/cluster/
roles.rs

1use std::str::FromStr;
2
3/// Node role in the cluster.
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
5pub enum NodeRole {
6    /// HTTP gateway for client requests.
7    Gateway,
8    /// Function executor.
9    Function,
10    /// Background job worker.
11    Worker,
12    /// Scheduler (leader-only) for crons and job assignment.
13    Scheduler,
14}
15
16impl NodeRole {
17    /// Convert to string for database storage.
18    pub fn as_str(&self) -> &'static str {
19        match self {
20            Self::Gateway => "gateway",
21            Self::Function => "function",
22            Self::Worker => "worker",
23            Self::Scheduler => "scheduler",
24        }
25    }
26
27    /// Get all default roles.
28    pub fn all() -> Vec<Self> {
29        vec![Self::Gateway, Self::Function, Self::Worker, Self::Scheduler]
30    }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct ParseNodeRoleError(pub String);
35
36impl std::fmt::Display for ParseNodeRoleError {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "invalid node role: {}", self.0)
39    }
40}
41
42impl std::error::Error for ParseNodeRoleError {}
43
44impl FromStr for NodeRole {
45    type Err = ParseNodeRoleError;
46
47    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
48        match s {
49            "gateway" => Ok(Self::Gateway),
50            "function" => Ok(Self::Function),
51            "worker" => Ok(Self::Worker),
52            "scheduler" => Ok(Self::Scheduler),
53            _ => Err(ParseNodeRoleError(s.to_string())),
54        }
55    }
56}
57
58impl std::fmt::Display for NodeRole {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.as_str())
61    }
62}
63
64/// Leader role for coordinated operations.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum LeaderRole {
67    /// Job assignment and cron triggering.
68    Scheduler,
69    /// Metrics aggregation.
70    MetricsAggregator,
71    /// Log compaction.
72    LogCompactor,
73}
74
75impl LeaderRole {
76    /// Get the PostgreSQL advisory lock ID for this role.
77    pub fn lock_id(&self) -> i64 {
78        // Use a unique ID based on "FORGE" + role number
79        // 0x464F524745 = "FORGE" in hex
80        match self {
81            Self::Scheduler => 0x464F_5247_0001,
82            Self::MetricsAggregator => 0x464F_5247_0002,
83            Self::LogCompactor => 0x464F_5247_0003,
84        }
85    }
86
87    /// Convert to string for database storage.
88    pub fn as_str(&self) -> &'static str {
89        match self {
90            Self::Scheduler => "scheduler",
91            Self::MetricsAggregator => "metrics_aggregator",
92            Self::LogCompactor => "log_compactor",
93        }
94    }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ParseLeaderRoleError(pub String);
99
100impl std::fmt::Display for ParseLeaderRoleError {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "invalid leader role: {}", self.0)
103    }
104}
105
106impl std::error::Error for ParseLeaderRoleError {}
107
108impl FromStr for LeaderRole {
109    type Err = ParseLeaderRoleError;
110
111    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
112        match s {
113            "scheduler" => Ok(Self::Scheduler),
114            "metrics_aggregator" => Ok(Self::MetricsAggregator),
115            "log_compactor" => Ok(Self::LogCompactor),
116            _ => Err(ParseLeaderRoleError(s.to_string())),
117        }
118    }
119}
120
121#[cfg(test)]
122#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
123mod tests {
124    use super::*;
125
126    #[test]
127    fn test_node_role_conversion() {
128        assert_eq!("gateway".parse::<NodeRole>(), Ok(NodeRole::Gateway));
129        assert_eq!("worker".parse::<NodeRole>(), Ok(NodeRole::Worker));
130        assert!("invalid".parse::<NodeRole>().is_err());
131        assert_eq!(NodeRole::Gateway.as_str(), "gateway");
132    }
133
134    #[test]
135    fn test_all_roles() {
136        let roles = NodeRole::all();
137        assert_eq!(roles.len(), 4);
138        assert!(roles.contains(&NodeRole::Gateway));
139        assert!(roles.contains(&NodeRole::Scheduler));
140    }
141
142    #[test]
143    fn test_leader_role_lock_ids() {
144        // Each leader role should have a unique lock ID
145        let scheduler_id = LeaderRole::Scheduler.lock_id();
146        let metrics_id = LeaderRole::MetricsAggregator.lock_id();
147        let log_id = LeaderRole::LogCompactor.lock_id();
148
149        assert_ne!(scheduler_id, metrics_id);
150        assert_ne!(metrics_id, log_id);
151        assert_ne!(scheduler_id, log_id);
152    }
153
154    #[test]
155    fn test_leader_role_conversion() {
156        assert_eq!("scheduler".parse::<LeaderRole>(), Ok(LeaderRole::Scheduler));
157        assert!("invalid".parse::<LeaderRole>().is_err());
158        assert_eq!(LeaderRole::Scheduler.as_str(), "scheduler");
159    }
160}