Skip to main content

convergio_mesh/
sync_loop.rs

1//! Background mesh sync loop.
2//!
3//! Spawns a tokio task that periodically syncs all SYNC_TABLES
4//! with every active peer in the peers registry.
5
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use convergio_db::pool::ConnPool;
10use tracing::{error, info, warn};
11
12use crate::peers_registry::peers_conf_path_from_env;
13use crate::peers_types::{PeerConfig, PeersRegistry};
14use crate::sync::sync_table_with_peer;
15use crate::transport::{resolve_best_addr, update_mesh_sync_stats};
16use crate::types::SYNC_TABLES;
17
18/// Spawn background tokio task: every `interval`, sync all tables with all active peers.
19pub fn spawn_sync_loop(pool: ConnPool, interval: Duration) {
20    tokio::spawn(async move {
21        info!(interval_secs = interval.as_secs(), "mesh sync loop started");
22        loop {
23            tokio::time::sleep(interval).await;
24            let pool_clone = pool.clone();
25            if let Err(e) = tokio::task::spawn_blocking(move || run_sync_round(&pool_clone)).await {
26                error!("sync round task panicked: {e}");
27            }
28        }
29    });
30}
31
32fn run_sync_round(pool: &ConnPool) {
33    let conf_path = std::path::PathBuf::from(peers_conf_path_from_env());
34    let registry = match PeersRegistry::load(&conf_path) {
35        Ok(r) => r,
36        Err(e) => {
37            warn!("peers registry load failed: {e}");
38            return;
39        }
40    };
41
42    let conn = match pool.get() {
43        Ok(c) => c,
44        Err(e) => {
45            error!("db pool get failed: {e}");
46            return;
47        }
48    };
49
50    let local_ts = crate::transport::detect_local_tailscale_ip();
51
52    for (name, peer) in registry.list_active() {
53        // Skip self (match by Tailscale IP)
54        if let Some(ref lip) = local_ts {
55            if peer.tailscale_ip == *lip {
56                continue;
57            }
58        }
59        let fields = peer_to_fields(peer);
60        let Some(addr) = resolve_best_addr(name, &fields) else {
61            warn!(peer = name, "no reachable address, skipping");
62            continue;
63        };
64
65        let started = Instant::now();
66        let mut total_sent = 0usize;
67        let mut total_received = 0usize;
68        let mut total_applied = 0usize;
69
70        for table in SYNC_TABLES {
71            let (sent, received, applied) = sync_table_with_peer(&conn, &addr, table);
72            total_sent += sent;
73            total_received += received;
74            total_applied += applied;
75            if sent > 0 || received > 0 || applied > 0 {
76                info!(peer = name, table, sent, received, applied, "table synced");
77            }
78        }
79
80        let latency_ms = started.elapsed().as_millis() as i64;
81        update_mesh_sync_stats(
82            &conn,
83            &addr,
84            total_sent,
85            total_received,
86            total_applied,
87            latency_ms,
88        );
89
90        info!(
91            peer = name,
92            addr = %addr,
93            sent = total_sent,
94            received = total_received,
95            applied = total_applied,
96            latency_ms,
97            "sync round complete"
98        );
99
100        // Send heartbeat so the peer knows we're online
101        send_heartbeat_to_peer(&addr, &registry.shared_secret);
102    }
103}
104
105fn send_heartbeat_to_peer(addr: &str, shared_secret: &str) {
106    let url = format!("http://{addr}/api/heartbeat");
107    let role = std::env::var("CONVERGIO_NODE_ROLE").unwrap_or_default();
108    let body = serde_json::json!({
109        "peer": local_node_name(),
110        "version": env!("CARGO_PKG_VERSION"),
111        "role": role,
112    });
113    let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
114
115    let client = reqwest::blocking::Client::builder()
116        .timeout(Duration::from_secs(5))
117        .build();
118    let Ok(client) = client else { return };
119    let token = std::env::var("CONVERGIO_AUTH_TOKEN").unwrap_or_else(|_| "dev-local".into());
120    let mut req = client
121        .post(&url)
122        .header("Content-Type", "application/json")
123        .header("Authorization", format!("Bearer {token}"));
124
125    // Sign with HMAC using the same timestamp:method:path:bodyhash protocol
126    // that the receiver expects (matching apply_mesh_auth in transport.rs).
127    if !shared_secret.is_empty() {
128        use sha2::{Digest, Sha256};
129        let body_hash = hex::encode(Sha256::digest(&body_bytes));
130        let timestamp = chrono::Utc::now().timestamp().to_string();
131        let message = format!("{timestamp}:POST:/api/heartbeat:{body_hash}");
132        if let Ok(sig) = crate::auth::compute_hmac(shared_secret.as_bytes(), message.as_bytes()) {
133            req = req
134                .header("X-Mesh-Timestamp", &timestamp)
135                .header("X-Mesh-Signature", hex::encode(sig))
136                .header("X-Mesh-Body-Hash", &body_hash);
137        }
138    }
139
140    match req.body(body_bytes).send() {
141        Ok(r) if r.status().is_success() => {
142            info!(addr, "heartbeat sent");
143        }
144        Ok(r) => {
145            let status = r.status();
146            let body = r.text().unwrap_or_default();
147            warn!(addr, %status, body, "heartbeat rejected");
148        }
149        Err(e) => warn!(addr, "heartbeat failed: {e}"),
150    }
151}
152
153fn local_node_name() -> String {
154    hostname::get()
155        .map(|h| h.to_string_lossy().to_string())
156        .unwrap_or_else(|_| "unknown".into())
157}
158
159fn peer_to_fields(peer: &PeerConfig) -> HashMap<String, String> {
160    let mut map = HashMap::new();
161    if let Some(ip) = &peer.thunderbolt_ip {
162        map.insert("thunderbolt_ip".into(), ip.clone());
163    }
164    if let Some(ip) = &peer.lan_ip {
165        map.insert("lan_ip".into(), ip.clone());
166    }
167    if !peer.tailscale_ip.is_empty() {
168        map.insert("tailscale_ip".into(), peer.tailscale_ip.clone());
169    }
170    map
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::peers_types::PeerConfig;
177
178    fn make_peer(tailscale_ip: &str) -> PeerConfig {
179        PeerConfig {
180            ssh_alias: "test".into(),
181            user: "user".into(),
182            os: "linux".into(),
183            tailscale_ip: tailscale_ip.into(),
184            dns_name: "test.ts.net".into(),
185            capabilities: vec![],
186            role: "worker".into(),
187            status: "active".into(),
188            thunderbolt_ip: None,
189            lan_ip: None,
190            mac_address: None,
191            gh_account: None,
192            runners: None,
193            runner_paths: None,
194            repo_path: None,
195            aliases: vec![],
196        }
197    }
198
199    #[test]
200    fn peer_to_fields_includes_tailscale() {
201        let peer = make_peer("100.0.0.1");
202        let fields = peer_to_fields(&peer);
203        assert_eq!(
204            fields.get("tailscale_ip").map(String::as_str),
205            Some("100.0.0.1")
206        );
207    }
208
209    #[test]
210    fn peer_to_fields_thunderbolt_absent() {
211        let peer = make_peer("100.0.0.2");
212        let fields = peer_to_fields(&peer);
213        assert!(!fields.contains_key("thunderbolt_ip"));
214    }
215
216    #[test]
217    fn peer_to_fields_with_all_transports() {
218        let mut peer = make_peer("100.0.0.3");
219        peer.thunderbolt_ip = Some("10.0.0.1".into());
220        peer.lan_ip = Some("192.168.1.1".into());
221        let fields = peer_to_fields(&peer);
222        assert_eq!(fields.len(), 3);
223    }
224}