Skip to main content

forge_runtime/cluster/
registry.rs

1use forge_core::cluster::{NodeInfo, NodeStatus};
2use forge_core::{ForgeError, Result};
3
4/// Node registry for cluster membership.
5pub struct NodeRegistry {
6    pool: sqlx::PgPool,
7    local_node: NodeInfo,
8}
9
10impl NodeRegistry {
11    pub fn new(pool: sqlx::PgPool, local_node: NodeInfo) -> Self {
12        Self { pool, local_node }
13    }
14
15    pub async fn register(&self) -> Result<()> {
16        let roles: Vec<String> = self
17            .local_node
18            .roles
19            .iter()
20            .map(|r| r.as_str().to_string())
21            .collect();
22
23        sqlx::query!(
24            r#"
25            INSERT INTO forge_nodes (
26                id, hostname, ip_address, http_port, grpc_port,
27                roles, worker_capabilities, status, version, started_at, last_heartbeat
28            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
29            ON CONFLICT (id) DO UPDATE SET
30                hostname = EXCLUDED.hostname,
31                ip_address = EXCLUDED.ip_address,
32                http_port = EXCLUDED.http_port,
33                grpc_port = EXCLUDED.grpc_port,
34                roles = EXCLUDED.roles,
35                worker_capabilities = EXCLUDED.worker_capabilities,
36                status = EXCLUDED.status,
37                version = EXCLUDED.version,
38                last_heartbeat = NOW()
39            "#,
40            self.local_node.id.as_uuid(),
41            &self.local_node.hostname,
42            self.local_node.ip_address.to_string(),
43            self.local_node.http_port as i32,
44            self.local_node.grpc_port as i32,
45            &roles,
46            &self.local_node.worker_capabilities,
47            self.local_node.status.as_str(),
48            &self.local_node.version,
49            self.local_node.started_at,
50        )
51        .execute(&self.pool)
52        .await
53        .map_err(ForgeError::Database)?;
54
55        Ok(())
56    }
57
58    pub async fn set_status(&self, status: NodeStatus) -> Result<()> {
59        sqlx::query!(
60            r#"
61            UPDATE forge_nodes
62            SET status = $2
63            WHERE id = $1
64            "#,
65            self.local_node.id.as_uuid(),
66            status.as_str(),
67        )
68        .execute(&self.pool)
69        .await
70        .map_err(ForgeError::Database)?;
71
72        Ok(())
73    }
74
75    pub async fn deregister(&self) -> Result<()> {
76        sqlx::query!(
77            r#"
78            DELETE FROM forge_nodes WHERE id = $1
79            "#,
80            self.local_node.id.as_uuid(),
81        )
82        .execute(&self.pool)
83        .await
84        .map_err(ForgeError::Database)?;
85
86        Ok(())
87    }
88}