volli_manager/connection/
join.rs

1//! Join functionality for manager-to-manager connections.
2//!
3//! This module consolidates both client-side and server-side join logic
4//! to make it easier to compare and maintain the join exchange protocol.
5
6use crate::bootstrap::{SigningInit, init_cert};
7use crate::config::ServerConfigOpts;
8use crate::connection::{ManagerContext, mesh::PeerDialRequest};
9use crate::keys::secret_dir;
10use crate::peers::update_alive;
11use crate::{ManagerPeerEntry, add_peer, update_profile};
12use base64::Engine;
13use eyre::Report;
14use sha2::{Digest, Sha256};
15use std::sync::atomic::Ordering;
16use tracing::info;
17use volli_core::token::{decode_token, verify_token};
18use volli_core::{Message, WorkerConfig};
19use volli_transport::MessageTransportExt;
20use volli_transport::Transport;
21
22/// Configuration for the join process
23pub struct JoinConfig<'a> {
24    pub advertise_host: &'a str,
25    pub tcp_port: u16,
26    pub quic_port: u16,
27    pub manager_name: Option<&'a str>,
28    pub bind_host: &'a str,
29}
30
31/// Client-side join functionality.
32///
33/// Connects to an existing manager using a bootstrap token and exchanges
34/// credentials to join the cluster mesh.
35pub async fn join_as_client(
36    join_peer: &ManagerPeerEntry,
37    token: &str,
38    profile: &str,
39    config: &JoinConfig<'_>,
40) -> Result<ManagerPeerEntry, Report> {
41    info!(
42        "Attempting to join via manager {} at {}:{}",
43        join_peer.manager_name, join_peer.host, join_peer.quic_port
44    );
45
46    let join_cfg = WorkerConfig {
47        role: volli_core::Role::Manager,
48        host: join_peer.host.clone(),
49        tcp_port: join_peer.tcp_port,
50        quic_port: join_peer.quic_port,
51        token: token.to_string(),
52        cert: base64::engine::general_purpose::STANDARD_NO_PAD
53            .decode(&join_peer.tls_cert)
54            .map_err(|e| eyre::eyre!("Failed to decode join target certificate: {}", e))?,
55        fingerprint: join_peer.tls_fp.clone(),
56        ..Default::default()
57    };
58
59    let mut transport = crate::connection::mesh::try_connect_with_fallback(&join_cfg, &None)
60        .await
61        .map_err(|e| eyre::eyre!("Failed to connect for join: {}", e))?
62        .0;
63
64    transport
65        .send(&Message::Join {
66            token: token.to_string(),
67        })
68        .await?;
69
70    info!("Awaiting join response");
71    match transport.recv().await? {
72        Some(Message::JoinResponse {
73            ver,
74            csk: new_csk,
75            peer,
76        }) => {
77            info!("Received join response with cluster key version {}", ver);
78
79            // Save cluster key using builder pattern for consistency
80            if let Err(e) = update_profile(profile)?
81                .cluster_key(&new_csk, ver, true)
82                .save_all()
83            {
84                tracing::error!("Failed to save cluster key: {}", e);
85                return Err(e);
86            }
87
88            // Now that join succeeded, generate our real peer metadata and send Announce
89            let secret_dir = secret_dir(Some(profile));
90            std::fs::create_dir_all(&secret_dir)?;
91
92            let mut show_bootstrap = false;
93            let SigningInit {
94                id: manager_id,
95                key: signing_key,
96                newly_generated: persist_keys,
97                sk_path,
98                pk_path,
99                ..
100            } = crate::bootstrap::init_signing(secret_dir.as_path(), &mut show_bootstrap)?;
101
102            let cert_init = init_cert(
103                &ServerConfigOpts {
104                    advertise_host: config.advertise_host.to_string(),
105                    tcp_port: config.tcp_port,
106                    quic_port: config.quic_port,
107                    ..Default::default()
108                },
109                secret_dir.as_path(),
110            )?;
111
112            let manager_name = config
113                .manager_name
114                .map(|s| s.to_string())
115                .unwrap_or_else(|| {
116                    volli_core::namegen::random_profile()
117                        .unwrap_or_else(|_| "joining-manager".to_string())
118                });
119
120            // Decode token to get tenant/cluster info
121            let token_dec = volli_core::token::decode_token(token)?;
122
123            let self_peer = ManagerPeerEntry {
124                manager_id: manager_id.clone(),
125                manager_name: manager_name.clone(),
126                tenant: token_dec.payload.tenant.clone(),
127                cluster: token_dec.payload.cluster.clone(),
128                host: config.advertise_host.to_string(),
129                tcp_port: config.tcp_port,
130                quic_port: config.quic_port,
131                pub_fp: hex::encode(&cert_init.fingerprint),
132                csk_ver: ver,
133                tls_cert: base64::engine::general_purpose::STANDARD_NO_PAD
134                    .encode(&cert_init.cert_der),
135                tls_fp: hex::encode(Sha256::digest(&cert_init.cert_der)),
136                health: None, // Health metrics will be populated later
137            };
138
139            // Use unified profile builder to persist all data in batched operations
140            update_profile(profile)?
141                .name(&manager_name)
142                .host(config.advertise_host)
143                .bind_host(config.bind_host)
144                .tcp_port(config.tcp_port)
145                .quic_port(config.quic_port)
146                .signing_key(&signing_key, persist_keys, sk_path, pk_path)
147                .certificate(
148                    &cert_init.cert_der,
149                    &cert_init.key_der,
150                    cert_init.newly_generated,
151                    cert_init.cert_path.clone(),
152                    cert_init.key_path.clone(),
153                )
154                .cluster_key(&new_csk, ver, true) // Always persist CSK during join
155                .secret_dir(secret_dir)
156                .save_all()?;
157
158            // Send Announce message with our peer metadata
159            transport
160                .send(&Message::Announce {
161                    meta: Box::new(self_peer.clone()),
162                    version: 0,      // Initial version
163                    peers: vec![],   // Empty on first announce
164                    workers: vec![], // Empty on first announce
165                })
166                .await?;
167
168            // Flush to ensure Announce message is transmitted before connection closes
169            transport.flush().await?;
170
171            // Wait for acknowledgment that the announcement was processed
172            match transport.recv().await? {
173                Some(Message::AuthOk) => {
174                    tracing::debug!("Received acknowledgment for join announcement");
175                }
176                Some(other) => {
177                    tracing::warn!("Expected acknowledgment, got: {:?}", other);
178                }
179                None => {
180                    tracing::warn!("Connection closed before receiving acknowledgment");
181                }
182            }
183
184            info!(
185                "Join process completed successfully, sent self announcement: {}",
186                manager_id
187            );
188            info!("Discovered real peer: {}", peer.manager_id);
189            // Persist the join target as a known peer immediately so that any
190            // workers connecting to this manager can receive full peer lists
191            // even before the mesh finishes exchanging gossip.
192            if let Err(e) = add_peer(profile, (*peer).clone()) {
193                tracing::warn!("Failed to add join peer to storage: {}", e);
194            }
195            Ok(*peer)
196        }
197        Some(_) => {
198            tracing::info!("Received unexpected message");
199            Err(eyre::eyre!("Did not receive expected join response"))
200        }
201        _ => Err(eyre::eyre!("Did not receive expected join response")),
202    }
203}
204
205/// Server-side join handling functionality.
206///
207/// Handles an incoming join request, validates the token, sends the cluster key,
208/// and processes the joining manager's announcement.
209pub async fn join_as_server(
210    ctx: &ManagerContext,
211    mut transport: Box<dyn Transport>,
212    peer: std::net::SocketAddr,
213    token: String,
214) -> Result<(), Report> {
215    let csk = *ctx.security.csk.read().await;
216    let csk_ver = ctx.security.csk_ver.load(Ordering::SeqCst);
217    let whitelist = ctx.network.manager_nets.clone();
218
219    // Join connection - authenticate and send JoinResponse
220    let token_dec = match decode_token(&token) {
221        Ok(t) => t,
222        Err(e) => {
223            tracing::error!("Join token decode failed for peer {}: {}", peer, e);
224            transport.send(&Message::AuthErr).await.ok();
225            return Ok(());
226        }
227    };
228
229    if token_dec.payload.tenant != ctx.state.self_meta.tenant
230        || token_dec.payload.cluster != ctx.state.self_meta.cluster
231    {
232        tracing::warn!(
233            %peer,
234            token_tenant = %token_dec.payload.tenant,
235            token_cluster = %token_dec.payload.cluster,
236            expected_tenant = %ctx.state.self_meta.tenant,
237            expected_cluster = %ctx.state.self_meta.cluster,
238            "join token tenant/cluster mismatch"
239        );
240        transport.send(&Message::AuthErr).await.ok();
241        return Ok(());
242    }
243
244    // Check whitelist
245    if !whitelist.is_empty() {
246        let peer_addr = peer.ip();
247
248        if !whitelist.iter().any(|net| net.contains(&peer_addr)) {
249            tracing::warn!(%peer, "join connection from non-whitelisted address");
250            transport.send(&Message::AuthErr).await.ok();
251            return Ok(());
252        }
253    }
254
255    if let Err(e) = verify_token(&token_dec, &csk) {
256        tracing::error!("Join token verification failed for peer {}: {}", peer, e);
257        transport.send(&Message::AuthErr).await.ok();
258        return Err(e);
259    }
260
261    info!(target: "connection", %peer, "join connection established");
262
263    if let Err(e) = transport
264        .send(&Message::JoinResponse {
265            ver: csk_ver,
266            csk,
267            peer: Box::new(ctx.state.self_meta.clone()),
268        })
269        .await
270    {
271        tracing::error!("Failed to send join response to {}: {}", peer, e);
272        return Err(e.into());
273    }
274
275    tracing::debug!("Join exchange completed, CSK provided to joining peer");
276    // Flush transport to ensure JoinResponse is transmitted before connection closes
277    if let Err(e) = transport.flush().await {
278        tracing::warn!("Failed to flush transport after join response: {}", e);
279    }
280
281    // Wait for joining manager to send Announce with their peer metadata
282    tracing::debug!("Waiting for joining manager to announce itself: {}", peer);
283    match transport.recv().await {
284        Ok(Some(Message::Announce { meta, .. })) => {
285            let joining_peer = *meta;
286            tracing::info!(
287                "Received peer announcement from joining manager: {} ({}:{})",
288                joining_peer.manager_id,
289                joining_peer.host,
290                joining_peer.quic_port
291            );
292
293            // Add joining peer to our peer list and enqueue for dial
294            if let Err(e) = add_peer(&ctx.communication.profile, joining_peer.clone()) {
295                tracing::warn!("Failed to add joining peer to storage: {}", e);
296            }
297
298            // Update the in-memory peer table and increment version counter
299            update_alive(
300                &ctx.state.peers,
301                &ctx.communication.alive_tx,
302                &ctx.communication.profile,
303                joining_peer.clone(),
304                &ctx.state.peer_version,
305            )
306            .await;
307
308            // Enqueue the joining peer for dial to establish mesh connection
309            if ctx
310                .communication
311                .dial_tx
312                .send(PeerDialRequest {
313                    peer: joining_peer.clone(),
314                    gossip_hint: false,
315                })
316                .is_err()
317            {
318                tracing::warn!(
319                    "Failed to enqueue joining peer for dial: {}",
320                    joining_peer.manager_id
321                );
322            } else {
323                tracing::debug!(
324                    "Enqueued joining peer for mesh connection: {} ({}:{})",
325                    joining_peer.manager_id,
326                    joining_peer.host,
327                    joining_peer.quic_port
328                );
329            }
330
331            // Send acknowledgment to confirm we processed the announcement
332            if let Err(e) = transport.send(&Message::AuthOk).await {
333                tracing::warn!("Failed to send join announcement acknowledgment: {}", e);
334            } else {
335                tracing::debug!("Sent acknowledgment for join announcement");
336            }
337        }
338        Ok(Some(msg)) => {
339            tracing::warn!("Expected Announce from joining client, got: {:?}", msg);
340        }
341        Ok(None) => {
342            tracing::debug!(
343                "Join client closed connection before announcing itself: {}",
344                peer
345            );
346        }
347        Err(_) => {
348            tracing::debug!(
349                "Join client disconnected before announcing itself: {}",
350                peer
351            );
352        }
353    }
354
355    // Wait for client to close the connection gracefully after announcement
356    tracing::debug!("Waiting for join client to close connection: {}", peer);
357    match transport.recv().await {
358        Ok(None) => {
359            tracing::debug!("Join client closed connection cleanly: {}", peer);
360        }
361        Ok(Some(msg)) => {
362            tracing::warn!(
363                "Unexpected message from join client after announce: {:?}",
364                msg
365            );
366        }
367        Err(_) => {
368            tracing::debug!("Join client disconnected: {}", peer);
369        }
370    }
371    Ok(())
372}