convergio_mesh/
sync_loop.rs1use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use convergio_db::pool::ConnPool;
10use tracing::{error, info, warn};
11
12use crate::peers_registry::peers_conf_path_from_env;
13use crate::peers_types::{PeerConfig, PeersRegistry};
14use crate::sync::sync_table_with_peer;
15use crate::transport::{resolve_best_addr, update_mesh_sync_stats};
16use crate::types::SYNC_TABLES;
17
18pub fn spawn_sync_loop(pool: ConnPool, interval: Duration) {
20 tokio::spawn(async move {
21 info!(interval_secs = interval.as_secs(), "mesh sync loop started");
22 loop {
23 tokio::time::sleep(interval).await;
24 let pool_clone = pool.clone();
25 if let Err(e) = tokio::task::spawn_blocking(move || run_sync_round(&pool_clone)).await {
26 error!("sync round task panicked: {e}");
27 }
28 }
29 });
30}
31
32fn run_sync_round(pool: &ConnPool) {
33 let conf_path = std::path::PathBuf::from(peers_conf_path_from_env());
34 let registry = match PeersRegistry::load(&conf_path) {
35 Ok(r) => r,
36 Err(e) => {
37 warn!("peers registry load failed: {e}");
38 return;
39 }
40 };
41
42 let conn = match pool.get() {
43 Ok(c) => c,
44 Err(e) => {
45 error!("db pool get failed: {e}");
46 return;
47 }
48 };
49
50 let local_ts = crate::transport::detect_local_tailscale_ip();
51
52 for (name, peer) in registry.list_active() {
53 if let Some(ref lip) = local_ts {
55 if peer.tailscale_ip == *lip {
56 continue;
57 }
58 }
59 let fields = peer_to_fields(peer);
60 let Some(addr) = resolve_best_addr(name, &fields) else {
61 warn!(peer = name, "no reachable address, skipping");
62 continue;
63 };
64
65 let started = Instant::now();
66 let mut total_sent = 0usize;
67 let mut total_received = 0usize;
68 let mut total_applied = 0usize;
69
70 for table in SYNC_TABLES {
71 let (sent, received, applied) = sync_table_with_peer(&conn, &addr, table);
72 total_sent += sent;
73 total_received += received;
74 total_applied += applied;
75 if sent > 0 || received > 0 || applied > 0 {
76 info!(peer = name, table, sent, received, applied, "table synced");
77 }
78 }
79
80 let latency_ms = started.elapsed().as_millis() as i64;
81 update_mesh_sync_stats(
82 &conn,
83 &addr,
84 total_sent,
85 total_received,
86 total_applied,
87 latency_ms,
88 );
89
90 info!(
91 peer = name,
92 addr = %addr,
93 sent = total_sent,
94 received = total_received,
95 applied = total_applied,
96 latency_ms,
97 "sync round complete"
98 );
99
100 send_heartbeat_to_peer(&addr, ®istry.shared_secret);
102 }
103}
104
105fn send_heartbeat_to_peer(addr: &str, shared_secret: &str) {
106 let url = format!("http://{addr}/api/heartbeat");
107 let role = std::env::var("CONVERGIO_NODE_ROLE").unwrap_or_default();
108 let body = serde_json::json!({
109 "peer": local_node_name(),
110 "version": env!("CARGO_PKG_VERSION"),
111 "role": role,
112 });
113 let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
114
115 let client = reqwest::blocking::Client::builder()
116 .timeout(Duration::from_secs(5))
117 .build();
118 let Ok(client) = client else { return };
119 let token = std::env::var("CONVERGIO_AUTH_TOKEN").unwrap_or_else(|_| "dev-local".into());
120 let mut req = client
121 .post(&url)
122 .header("Content-Type", "application/json")
123 .header("Authorization", format!("Bearer {token}"));
124
125 if !shared_secret.is_empty() {
128 use sha2::{Digest, Sha256};
129 let body_hash = hex::encode(Sha256::digest(&body_bytes));
130 let timestamp = chrono::Utc::now().timestamp().to_string();
131 let message = format!("{timestamp}:POST:/api/heartbeat:{body_hash}");
132 if let Ok(sig) = crate::auth::compute_hmac(shared_secret.as_bytes(), message.as_bytes()) {
133 req = req
134 .header("X-Mesh-Timestamp", ×tamp)
135 .header("X-Mesh-Signature", hex::encode(sig))
136 .header("X-Mesh-Body-Hash", &body_hash);
137 }
138 }
139
140 match req.body(body_bytes).send() {
141 Ok(r) if r.status().is_success() => {
142 info!(addr, "heartbeat sent");
143 }
144 Ok(r) => {
145 let status = r.status();
146 let body = r.text().unwrap_or_default();
147 warn!(addr, %status, body, "heartbeat rejected");
148 }
149 Err(e) => warn!(addr, "heartbeat failed: {e}"),
150 }
151}
152
153fn local_node_name() -> String {
154 hostname::get()
155 .map(|h| h.to_string_lossy().to_string())
156 .unwrap_or_else(|_| "unknown".into())
157}
158
159fn peer_to_fields(peer: &PeerConfig) -> HashMap<String, String> {
160 let mut map = HashMap::new();
161 if let Some(ip) = &peer.thunderbolt_ip {
162 map.insert("thunderbolt_ip".into(), ip.clone());
163 }
164 if let Some(ip) = &peer.lan_ip {
165 map.insert("lan_ip".into(), ip.clone());
166 }
167 if !peer.tailscale_ip.is_empty() {
168 map.insert("tailscale_ip".into(), peer.tailscale_ip.clone());
169 }
170 map
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::peers_types::PeerConfig;
177
178 fn make_peer(tailscale_ip: &str) -> PeerConfig {
179 PeerConfig {
180 ssh_alias: "test".into(),
181 user: "user".into(),
182 os: "linux".into(),
183 tailscale_ip: tailscale_ip.into(),
184 dns_name: "test.ts.net".into(),
185 capabilities: vec![],
186 role: "worker".into(),
187 status: "active".into(),
188 thunderbolt_ip: None,
189 lan_ip: None,
190 mac_address: None,
191 gh_account: None,
192 runners: None,
193 runner_paths: None,
194 repo_path: None,
195 aliases: vec![],
196 }
197 }
198
199 #[test]
200 fn peer_to_fields_includes_tailscale() {
201 let peer = make_peer("100.0.0.1");
202 let fields = peer_to_fields(&peer);
203 assert_eq!(
204 fields.get("tailscale_ip").map(String::as_str),
205 Some("100.0.0.1")
206 );
207 }
208
209 #[test]
210 fn peer_to_fields_thunderbolt_absent() {
211 let peer = make_peer("100.0.0.2");
212 let fields = peer_to_fields(&peer);
213 assert!(!fields.contains_key("thunderbolt_ip"));
214 }
215
216 #[test]
217 fn peer_to_fields_with_all_transports() {
218 let mut peer = make_peer("100.0.0.3");
219 peer.thunderbolt_ip = Some("10.0.0.1".into());
220 peer.lan_ip = Some("192.168.1.1".into());
221 let fields = peer_to_fields(&peer);
222 assert_eq!(fields.len(), 3);
223 }
224}