1#![cfg_attr(test, allow(unused_crate_dependencies))]
2
3use crate::env_config::config_dir_env;
4use serde::{Deserialize, Serialize};
5use std::path::PathBuf;
6
7pub fn config_dir() -> PathBuf {
9 if let Some(dir) = config_dir_env() {
10 PathBuf::from(dir)
11 } else if let Some(mut path) = dirs_next::config_dir() {
12 path.push("volli");
13 path
14 } else {
15 dirs_next::home_dir()
16 .unwrap_or_else(|| PathBuf::from("."))
17 .join(".volli")
18 }
19}
20
21pub const DEFAULT_TCP_PORT: u16 = 4242;
23pub const DEFAULT_QUIC_PORT: u16 = 4242;
25pub mod codec;
26pub mod env_config;
27pub mod handshake;
28pub mod namegen;
29pub mod peer_db;
30pub mod peer_store;
31pub mod profile;
32pub mod token;
33pub mod util;
34pub mod worker;
35
36pub use env_config::{
37 ConfigDirGuard, ConfigGuard, EnvironmentConfig, env_config, override_config_dir,
38 override_env_config, override_env_config_patch,
39};
40pub use worker::{Protocol, Role, WorkerConfig};
41
42#[derive(Debug, Serialize, Deserialize)]
44pub enum Message {
45 Ping {
46 version: u64,
47 },
48 Pong {
49 mac: String,
50 version: u64,
51 },
52 Auth {
53 token: String,
54 #[serde(default)]
55 worker_id: Option<String>,
56 #[serde(default)]
57 worker_name: Option<String>,
58 },
59 Join {
60 token: String,
61 },
62 AuthOk,
63 AuthErr,
64 TokenRefreshRequest {
65 token: String,
66 },
67 TokenRefreshOk {
68 token: String,
69 },
70 TokenRefreshErr {
71 reason: String,
72 },
73 Hello {
74 manager_id: String,
75 nonce: [u8; 32],
76 sig: Vec<u8>,
77 },
78 Welcome {
79 manager_id: String,
80 nonce: [u8; 32],
81 sig: Vec<u8>,
82 },
83 ClusterKey {
84 ver: u32,
85 csk: [u8; 32],
86 },
87 JoinResponse {
89 ver: u32,
90 csk: [u8; 32],
91 peer: Box<ManagerPeerEntry>,
92 },
93 ClientAuth {
95 token: String,
96 },
97 ClientCommandRequest {
99 request_id: String,
100 command: String,
101 args: Vec<String>,
102 target: String, timeout_secs: u64,
104 options: String, },
106 ClientCommandResponse {
108 request_id: String,
109 worker_id: String,
110 worker_name: Option<String>,
111 success: bool,
112 duration_millis: u64,
113 output: String,
114 },
115 ClientCommandHeader {
117 request_id: String,
118 worker_id: String,
119 worker_name: Option<String>,
120 payload: Vec<u8>,
121 },
122 ClientCommandStream {
124 request_id: String,
125 worker_id: String,
126 worker_name: Option<String>,
127 payload: Vec<u8>,
128 },
129 ClientCommandFooter {
131 request_id: String,
132 worker_id: String,
133 worker_name: Option<String>,
134 payload: Vec<u8>,
135 duration_millis: u64,
136 success: bool,
137 },
138 ClientCommandComplete {
140 request_id: String,
141 total_workers: u32,
142 },
143 ClientCommandError {
145 request_id: String,
146 error: String,
147 },
148 ClientCommandCancel {
150 request_id: String,
151 },
152 WorkerCommandRequest {
154 request_id: String,
155 command: String,
156 args: Vec<String>,
157 timeout_secs: u64,
158 options: String, },
160 WorkerCommandResponse {
162 request_id: String,
163 worker_id: String,
164 success: bool,
165 duration_millis: u64,
166 output: String,
167 },
168 WorkerCommandHeader {
170 request_id: String,
171 worker_id: String,
172 payload: Vec<u8>,
173 },
174 WorkerCommandStream {
176 request_id: String,
177 worker_id: String,
178 payload: Vec<u8>,
179 },
180 WorkerCommandFooter {
182 request_id: String,
183 worker_id: String,
184 payload: Vec<u8>,
185 duration_millis: u64,
186 success: bool,
187 },
188 WorkerCommandError {
190 request_id: String,
191 error: String,
192 },
193 WorkerCommandCancel {
195 request_id: String,
196 },
197 Announce {
204 meta: Box<ManagerPeerEntry>,
205 version: u64,
206 peers: Vec<ManagerPeerEntry>,
207 #[serde(default)]
209 workers: Vec<WorkerEntry>,
210 },
211 Goodbye,
213}
214
215#[cfg(test)]
216mod message_tests {
217 use super::*;
218 use crate::codec::Codec;
219
220 #[test]
221 fn bincode_announce_roundtrip() {
222 let meta = ManagerPeerEntry {
223 manager_id: "m1".into(),
224 manager_name: "m1".into(),
225 tenant: "t".into(),
226 cluster: "c".into(),
227 host: "h1".into(),
228 tcp_port: 1,
229 quic_port: 1,
230 pub_fp: String::new(),
231 csk_ver: 0,
232 tls_cert: "cert1".into(),
233 tls_fp: "fp1".into(),
234 health: None,
235 };
236 let extra = ManagerPeerEntry {
237 host: "h2".into(),
238 tcp_port: 2,
239 quic_port: 2,
240 tls_cert: "cert2".into(),
241 tls_fp: "fp2".into(),
242 manager_id: "m2".into(),
243 manager_name: "m2".into(),
244 tenant: "t".into(),
245 cluster: "c".into(),
246 pub_fp: String::new(),
247 csk_ver: 0,
248 health: None,
249 };
250 let msg = Message::Announce {
251 meta: Box::new(meta),
252 version: 1,
253 peers: vec![extra],
254 workers: Vec::new(),
255 };
256 let bytes = crate::codec::JsonCodec::encode(&msg);
257 let decoded = crate::codec::JsonCodec::decode(&bytes).unwrap();
258 match decoded {
259 Message::Announce { version, peers, .. } => {
260 assert_eq!(version, 1);
261 assert_eq!(peers.len(), 1);
262 }
263 other => panic!("unexpected: {:?}", other),
264 }
265 }
266}
267
268#[cfg(test)]
269mod bincode_smoke {
270 use super::*;
271 use crate::codec::Codec;
272
273 #[test]
274 fn manager_peer_entry_bincode_roundtrip() {
275 let e = ManagerPeerEntry {
276 manager_id: "m1".into(),
277 manager_name: "m1".into(),
278 tenant: "t".into(),
279 cluster: "c".into(),
280 host: "h1".into(),
281 tcp_port: 1,
282 quic_port: 1,
283 pub_fp: String::new(),
284 csk_ver: 0,
285 tls_cert: "cert1".into(),
286 tls_fp: "fp1".into(),
287 health: None,
288 };
289 let msg = Message::Announce {
290 meta: Box::new(e.clone()),
291 version: 0,
292 peers: vec![e.clone()],
293 workers: Vec::new(),
294 };
295 let bytes = crate::codec::BincodeCodec::encode(&msg);
296 let decoded = crate::codec::BincodeCodec::decode(&bytes).unwrap();
297 match decoded {
298 Message::Announce { version, peers, .. } => {
299 assert_eq!(version, 0);
300 assert_eq!(peers.len(), 1);
301 }
302 other => panic!("unexpected: {:?}", other),
303 }
304 }
305
306 #[derive(Debug, Serialize, Deserialize, PartialEq)]
307 struct Wrapper {
308 meta: ManagerPeerEntry,
309 peers: Vec<ManagerPeerEntry>,
310 }
311
312 #[test]
313 fn wrapper_bincode_roundtrip() {
314 let e = ManagerPeerEntry {
315 manager_id: "m1".into(),
316 manager_name: "m1".into(),
317 tenant: "t".into(),
318 cluster: "c".into(),
319 host: "h1".into(),
320 tcp_port: 1,
321 quic_port: 1,
322 pub_fp: String::new(),
323 csk_ver: 0,
324 tls_cert: "cert1".into(),
325 tls_fp: "fp1".into(),
326 health: None,
327 };
328 let w = Wrapper {
329 meta: e.clone(),
330 peers: vec![e.clone()],
331 };
332 let bytes = crate::codec::BincodeCodec::encode(&Message::Ping { version: 42 });
333 let _ = bytes.len();
334 let json = serde_json::to_string(&w).unwrap();
336 let w2: Wrapper = serde_json::from_str(&json).unwrap();
337 assert_eq!(w.peers.len(), w2.peers.len());
338 }
339
340 #[test]
341 fn peer_entry_alone_bincode_roundtrip() {
342 let e = ManagerPeerEntry {
343 manager_id: "m1".into(),
344 manager_name: "m1".into(),
345 tenant: "t".into(),
346 cluster: "c".into(),
347 host: "h1".into(),
348 tcp_port: 1,
349 quic_port: 1,
350 pub_fp: String::new(),
351 csk_ver: 0,
352 tls_cert: "cert1".into(),
353 tls_fp: "fp1".into(),
354 health: None,
355 };
356 let bytes = crate::codec::BincodeCodec::encode(&Message::Hello {
357 manager_id: "x".into(),
358 nonce: [0u8; 32],
359 sig: vec![],
360 });
361 let _ = bytes.len();
362 let v = vec![e.clone(), e];
363 let bv = bincode::serialize(&v).unwrap();
365 let v2: Vec<ManagerPeerEntry> = bincode::deserialize(&bv).unwrap();
366 assert_eq!(v2.len(), 2);
367 }
368
369 #[test]
370 fn sanity_bincode_vec_string() {
371 let v = vec!["a".to_string(), "b".to_string()];
372 let bytes = bincode::serialize(&v).unwrap();
373 let vv: Vec<String> = bincode::deserialize(&bytes).unwrap();
374 assert_eq!(vv.len(), 2);
375 }
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
380pub struct ManagerPeerEntry {
381 pub manager_id: String,
382 pub manager_name: String,
383 pub tenant: String,
384 pub cluster: String,
385 pub host: String,
386 pub tcp_port: u16,
387 pub quic_port: u16,
388 pub pub_fp: String,
389 pub csk_ver: u32,
390 pub tls_cert: String,
391 pub tls_fp: String,
392
393 #[serde(default)]
395 pub health: Option<HealthMetrics>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
400pub struct HealthMetrics {
401 pub health_score: f32, pub load_percentage: f32, pub max_workers: Option<u32>, pub current_workers: u32, pub avg_cpu: Option<f32>, pub avg_memory: Option<f32>, pub last_health_update: u64, }
409
410impl Default for HealthMetrics {
411 fn default() -> Self {
412 Self {
413 health_score: 1.0, load_percentage: 0.0,
415 max_workers: None, current_workers: 0,
417 avg_cpu: None,
418 avg_memory: None,
419 last_health_update: 0,
420 }
421 }
422}
423
424impl ManagerPeerEntry {
425 pub fn calculate_load_factor(&self) -> f32 {
427 match &self.health {
428 Some(health) => (health.load_percentage / 100.0).clamp(0.0, 1.0),
429 None => 0.0,
430 }
431 }
432
433 pub fn weighted_score(&self, rtt_ms: Option<f64>) -> f64 {
435 match &self.health {
436 Some(health) => {
437 let health_factor = health.health_score as f64;
438 let load_factor = 1.0 - self.calculate_load_factor() as f64;
440 let rtt_factor = rtt_ms.map(|rtt| 1.0 / (1.0 + rtt / 100.0)).unwrap_or(1.0);
441
442 0.4 * health_factor + 0.4 * load_factor + 0.2 * rtt_factor
444 }
445 None => 0.5, }
447 }
448}
449
450#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
451pub enum ConnectionState {
452 #[default]
453 Inactive,
454 Client,
455 Server,
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
460pub struct WorkerEntry {
461 pub worker_id: String,
462 pub manager_id: String,
463 #[serde(default)]
464 pub worker_name: Option<String>,
465 #[serde(default)]
466 pub last_seen: Option<u64>,
467 #[serde(default)]
468 pub connected_since: Option<u64>,
469 #[serde(default)]
470 pub disconnected_at: Option<u64>,
471}
472
473#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
474pub struct AliveEntry {
475 pub meta: ManagerPeerEntry,
476 pub last_seen: u64,
477}
478
479#[derive(Debug, Serialize, Deserialize)]
480pub struct PingRequest {
481 pub host: String,
482}
483
484#[derive(Debug, Serialize, Deserialize)]
485pub struct PingResponse {
486 pub success: bool,
487 pub latency_ms: u32,
488}