forge_runtime/cluster/
registry.rs1use forge_core::cluster::{NodeInfo, NodeStatus};
2use forge_core::{ForgeError, Result};
3
4pub struct NodeRegistry {
6 pool: sqlx::PgPool,
7 local_node: NodeInfo,
8}
9
10impl NodeRegistry {
11 pub fn new(pool: sqlx::PgPool, local_node: NodeInfo) -> Self {
12 Self { pool, local_node }
13 }
14
15 pub async fn register(&self) -> Result<()> {
16 let roles: Vec<String> = self
17 .local_node
18 .roles
19 .iter()
20 .map(|r| r.as_str().to_string())
21 .collect();
22
23 sqlx::query!(
24 r#"
25 INSERT INTO forge_nodes (
26 id, hostname, ip_address, http_port, grpc_port,
27 roles, worker_capabilities, status, version, started_at, last_heartbeat
28 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
29 ON CONFLICT (id) DO UPDATE SET
30 hostname = EXCLUDED.hostname,
31 ip_address = EXCLUDED.ip_address,
32 http_port = EXCLUDED.http_port,
33 grpc_port = EXCLUDED.grpc_port,
34 roles = EXCLUDED.roles,
35 worker_capabilities = EXCLUDED.worker_capabilities,
36 status = EXCLUDED.status,
37 version = EXCLUDED.version,
38 last_heartbeat = NOW()
39 "#,
40 self.local_node.id.as_uuid(),
41 &self.local_node.hostname,
42 self.local_node.ip_address.to_string(),
43 self.local_node.http_port as i32,
44 self.local_node.grpc_port as i32,
45 &roles,
46 &self.local_node.worker_capabilities,
47 self.local_node.status.as_str(),
48 &self.local_node.version,
49 self.local_node.started_at,
50 )
51 .execute(&self.pool)
52 .await
53 .map_err(ForgeError::Database)?;
54
55 Ok(())
56 }
57
58 pub async fn set_status(&self, status: NodeStatus) -> Result<()> {
59 sqlx::query!(
60 r#"
61 UPDATE forge_nodes
62 SET status = $2
63 WHERE id = $1
64 "#,
65 self.local_node.id.as_uuid(),
66 status.as_str(),
67 )
68 .execute(&self.pool)
69 .await
70 .map_err(ForgeError::Database)?;
71
72 Ok(())
73 }
74
75 pub async fn deregister(&self) -> Result<()> {
76 sqlx::query!(
77 r#"
78 DELETE FROM forge_nodes WHERE id = $1
79 "#,
80 self.local_node.id.as_uuid(),
81 )
82 .execute(&self.pool)
83 .await
84 .map_err(ForgeError::Database)?;
85
86 Ok(())
87 }
88}