volli_manager/connection/
join.rs1use crate::bootstrap::{SigningInit, init_cert};
7use crate::config::ServerConfigOpts;
8use crate::connection::{ManagerContext, mesh::PeerDialRequest};
9use crate::keys::secret_dir;
10use crate::peers::update_alive;
11use crate::{ManagerPeerEntry, add_peer, update_profile};
12use base64::Engine;
13use eyre::Report;
14use sha2::{Digest, Sha256};
15use std::sync::atomic::Ordering;
16use tracing::info;
17use volli_core::token::{decode_token, verify_token};
18use volli_core::{Message, WorkerConfig};
19use volli_transport::MessageTransportExt;
20use volli_transport::Transport;
21
22pub struct JoinConfig<'a> {
24 pub advertise_host: &'a str,
25 pub tcp_port: u16,
26 pub quic_port: u16,
27 pub manager_name: Option<&'a str>,
28 pub bind_host: &'a str,
29}
30
31pub async fn join_as_client(
36 join_peer: &ManagerPeerEntry,
37 token: &str,
38 profile: &str,
39 config: &JoinConfig<'_>,
40) -> Result<ManagerPeerEntry, Report> {
41 info!(
42 "Attempting to join via manager {} at {}:{}",
43 join_peer.manager_name, join_peer.host, join_peer.quic_port
44 );
45
46 let join_cfg = WorkerConfig {
47 role: volli_core::Role::Manager,
48 host: join_peer.host.clone(),
49 tcp_port: join_peer.tcp_port,
50 quic_port: join_peer.quic_port,
51 token: token.to_string(),
52 cert: base64::engine::general_purpose::STANDARD_NO_PAD
53 .decode(&join_peer.tls_cert)
54 .map_err(|e| eyre::eyre!("Failed to decode join target certificate: {}", e))?,
55 fingerprint: join_peer.tls_fp.clone(),
56 ..Default::default()
57 };
58
59 let mut transport = crate::connection::mesh::try_connect_with_fallback(&join_cfg, &None)
60 .await
61 .map_err(|e| eyre::eyre!("Failed to connect for join: {}", e))?
62 .0;
63
64 transport
65 .send(&Message::Join {
66 token: token.to_string(),
67 })
68 .await?;
69
70 info!("Awaiting join response");
71 match transport.recv().await? {
72 Some(Message::JoinResponse {
73 ver,
74 csk: new_csk,
75 peer,
76 }) => {
77 info!("Received join response with cluster key version {}", ver);
78
79 if let Err(e) = update_profile(profile)?
81 .cluster_key(&new_csk, ver, true)
82 .save_all()
83 {
84 tracing::error!("Failed to save cluster key: {}", e);
85 return Err(e);
86 }
87
88 let secret_dir = secret_dir(Some(profile));
90 std::fs::create_dir_all(&secret_dir)?;
91
92 let mut show_bootstrap = false;
93 let SigningInit {
94 id: manager_id,
95 key: signing_key,
96 newly_generated: persist_keys,
97 sk_path,
98 pk_path,
99 ..
100 } = crate::bootstrap::init_signing(secret_dir.as_path(), &mut show_bootstrap)?;
101
102 let cert_init = init_cert(
103 &ServerConfigOpts {
104 advertise_host: config.advertise_host.to_string(),
105 tcp_port: config.tcp_port,
106 quic_port: config.quic_port,
107 ..Default::default()
108 },
109 secret_dir.as_path(),
110 )?;
111
112 let manager_name = config
113 .manager_name
114 .map(|s| s.to_string())
115 .unwrap_or_else(|| {
116 volli_core::namegen::random_profile()
117 .unwrap_or_else(|_| "joining-manager".to_string())
118 });
119
120 let token_dec = volli_core::token::decode_token(token)?;
122
123 let self_peer = ManagerPeerEntry {
124 manager_id: manager_id.clone(),
125 manager_name: manager_name.clone(),
126 tenant: token_dec.payload.tenant.clone(),
127 cluster: token_dec.payload.cluster.clone(),
128 host: config.advertise_host.to_string(),
129 tcp_port: config.tcp_port,
130 quic_port: config.quic_port,
131 pub_fp: hex::encode(&cert_init.fingerprint),
132 csk_ver: ver,
133 tls_cert: base64::engine::general_purpose::STANDARD_NO_PAD
134 .encode(&cert_init.cert_der),
135 tls_fp: hex::encode(Sha256::digest(&cert_init.cert_der)),
136 health: None, };
138
139 update_profile(profile)?
141 .name(&manager_name)
142 .host(config.advertise_host)
143 .bind_host(config.bind_host)
144 .tcp_port(config.tcp_port)
145 .quic_port(config.quic_port)
146 .signing_key(&signing_key, persist_keys, sk_path, pk_path)
147 .certificate(
148 &cert_init.cert_der,
149 &cert_init.key_der,
150 cert_init.newly_generated,
151 cert_init.cert_path.clone(),
152 cert_init.key_path.clone(),
153 )
154 .cluster_key(&new_csk, ver, true) .secret_dir(secret_dir)
156 .save_all()?;
157
158 transport
160 .send(&Message::Announce {
161 meta: Box::new(self_peer.clone()),
162 version: 0, peers: vec![], workers: vec![], })
166 .await?;
167
168 transport.flush().await?;
170
171 match transport.recv().await? {
173 Some(Message::AuthOk) => {
174 tracing::debug!("Received acknowledgment for join announcement");
175 }
176 Some(other) => {
177 tracing::warn!("Expected acknowledgment, got: {:?}", other);
178 }
179 None => {
180 tracing::warn!("Connection closed before receiving acknowledgment");
181 }
182 }
183
184 info!(
185 "Join process completed successfully, sent self announcement: {}",
186 manager_id
187 );
188 info!("Discovered real peer: {}", peer.manager_id);
189 if let Err(e) = add_peer(profile, (*peer).clone()) {
193 tracing::warn!("Failed to add join peer to storage: {}", e);
194 }
195 Ok(*peer)
196 }
197 Some(_) => {
198 tracing::info!("Received unexpected message");
199 Err(eyre::eyre!("Did not receive expected join response"))
200 }
201 _ => Err(eyre::eyre!("Did not receive expected join response")),
202 }
203}
204
205pub async fn join_as_server(
210 ctx: &ManagerContext,
211 mut transport: Box<dyn Transport>,
212 peer: std::net::SocketAddr,
213 token: String,
214) -> Result<(), Report> {
215 let csk = *ctx.security.csk.read().await;
216 let csk_ver = ctx.security.csk_ver.load(Ordering::SeqCst);
217 let whitelist = ctx.network.manager_nets.clone();
218
219 let token_dec = match decode_token(&token) {
221 Ok(t) => t,
222 Err(e) => {
223 tracing::error!("Join token decode failed for peer {}: {}", peer, e);
224 transport.send(&Message::AuthErr).await.ok();
225 return Ok(());
226 }
227 };
228
229 if token_dec.payload.tenant != ctx.state.self_meta.tenant
230 || token_dec.payload.cluster != ctx.state.self_meta.cluster
231 {
232 tracing::warn!(
233 %peer,
234 token_tenant = %token_dec.payload.tenant,
235 token_cluster = %token_dec.payload.cluster,
236 expected_tenant = %ctx.state.self_meta.tenant,
237 expected_cluster = %ctx.state.self_meta.cluster,
238 "join token tenant/cluster mismatch"
239 );
240 transport.send(&Message::AuthErr).await.ok();
241 return Ok(());
242 }
243
244 if !whitelist.is_empty() {
246 let peer_addr = peer.ip();
247
248 if !whitelist.iter().any(|net| net.contains(&peer_addr)) {
249 tracing::warn!(%peer, "join connection from non-whitelisted address");
250 transport.send(&Message::AuthErr).await.ok();
251 return Ok(());
252 }
253 }
254
255 if let Err(e) = verify_token(&token_dec, &csk) {
256 tracing::error!("Join token verification failed for peer {}: {}", peer, e);
257 transport.send(&Message::AuthErr).await.ok();
258 return Err(e);
259 }
260
261 info!(target: "connection", %peer, "join connection established");
262
263 if let Err(e) = transport
264 .send(&Message::JoinResponse {
265 ver: csk_ver,
266 csk,
267 peer: Box::new(ctx.state.self_meta.clone()),
268 })
269 .await
270 {
271 tracing::error!("Failed to send join response to {}: {}", peer, e);
272 return Err(e.into());
273 }
274
275 tracing::debug!("Join exchange completed, CSK provided to joining peer");
276 if let Err(e) = transport.flush().await {
278 tracing::warn!("Failed to flush transport after join response: {}", e);
279 }
280
281 tracing::debug!("Waiting for joining manager to announce itself: {}", peer);
283 match transport.recv().await {
284 Ok(Some(Message::Announce { meta, .. })) => {
285 let joining_peer = *meta;
286 tracing::info!(
287 "Received peer announcement from joining manager: {} ({}:{})",
288 joining_peer.manager_id,
289 joining_peer.host,
290 joining_peer.quic_port
291 );
292
293 if let Err(e) = add_peer(&ctx.communication.profile, joining_peer.clone()) {
295 tracing::warn!("Failed to add joining peer to storage: {}", e);
296 }
297
298 update_alive(
300 &ctx.state.peers,
301 &ctx.communication.alive_tx,
302 &ctx.communication.profile,
303 joining_peer.clone(),
304 &ctx.state.peer_version,
305 )
306 .await;
307
308 if ctx
310 .communication
311 .dial_tx
312 .send(PeerDialRequest {
313 peer: joining_peer.clone(),
314 gossip_hint: false,
315 })
316 .is_err()
317 {
318 tracing::warn!(
319 "Failed to enqueue joining peer for dial: {}",
320 joining_peer.manager_id
321 );
322 } else {
323 tracing::debug!(
324 "Enqueued joining peer for mesh connection: {} ({}:{})",
325 joining_peer.manager_id,
326 joining_peer.host,
327 joining_peer.quic_port
328 );
329 }
330
331 if let Err(e) = transport.send(&Message::AuthOk).await {
333 tracing::warn!("Failed to send join announcement acknowledgment: {}", e);
334 } else {
335 tracing::debug!("Sent acknowledgment for join announcement");
336 }
337 }
338 Ok(Some(msg)) => {
339 tracing::warn!("Expected Announce from joining client, got: {:?}", msg);
340 }
341 Ok(None) => {
342 tracing::debug!(
343 "Join client closed connection before announcing itself: {}",
344 peer
345 );
346 }
347 Err(_) => {
348 tracing::debug!(
349 "Join client disconnected before announcing itself: {}",
350 peer
351 );
352 }
353 }
354
355 tracing::debug!("Waiting for join client to close connection: {}", peer);
357 match transport.recv().await {
358 Ok(None) => {
359 tracing::debug!("Join client closed connection cleanly: {}", peer);
360 }
361 Ok(Some(msg)) => {
362 tracing::warn!(
363 "Unexpected message from join client after announce: {:?}",
364 msg
365 );
366 }
367 Err(_) => {
368 tracing::debug!("Join client disconnected: {}", peer);
369 }
370 }
371 Ok(())
372}