volli_core/
lib.rs

1#![cfg_attr(test, allow(unused_crate_dependencies))]
2
3use crate::env_config::config_dir_env;
4use serde::{Deserialize, Serialize};
5use std::path::PathBuf;
6
7/// Return the base directory for all configuration files.
8pub fn config_dir() -> PathBuf {
9    if let Some(dir) = config_dir_env() {
10        PathBuf::from(dir)
11    } else if let Some(mut path) = dirs_next::config_dir() {
12        path.push("volli");
13        path
14    } else {
15        dirs_next::home_dir()
16            .unwrap_or_else(|| PathBuf::from("."))
17            .join(".volli")
18    }
19}
20
21/// Default port used for TCP connections.
22pub const DEFAULT_TCP_PORT: u16 = 4242;
23/// Default port used for QUIC connections.
24pub const DEFAULT_QUIC_PORT: u16 = 4242;
25pub mod codec;
26pub mod env_config;
27pub mod handshake;
28pub mod namegen;
29pub mod peer_db;
30pub mod peer_store;
31pub mod profile;
32pub mod token;
33pub mod util;
34pub mod worker;
35
36pub use env_config::{
37    ConfigDirGuard, ConfigGuard, EnvironmentConfig, env_config, override_config_dir,
38    override_env_config, override_env_config_patch,
39};
40pub use worker::{Protocol, Role, WorkerConfig};
41
42/// Basic message type used between worker and server.
43#[derive(Debug, Serialize, Deserialize)]
44pub enum Message {
45    Ping {
46        version: u64,
47    },
48    Pong {
49        mac: String,
50        version: u64,
51    },
52    Auth {
53        token: String,
54        #[serde(default)]
55        worker_id: Option<String>,
56        #[serde(default)]
57        worker_name: Option<String>,
58    },
59    Join {
60        token: String,
61    },
62    AuthOk,
63    AuthErr,
64    TokenRefreshRequest {
65        token: String,
66    },
67    TokenRefreshOk {
68        token: String,
69    },
70    TokenRefreshErr {
71        reason: String,
72    },
73    Hello {
74        manager_id: String,
75        nonce: [u8; 32],
76        sig: Vec<u8>,
77    },
78    Welcome {
79        manager_id: String,
80        nonce: [u8; 32],
81        sig: Vec<u8>,
82    },
83    ClusterKey {
84        ver: u32,
85        csk: [u8; 32],
86    },
87    /// Join response with cluster key and peer info
88    JoinResponse {
89        ver: u32,
90        csk: [u8; 32],
91        peer: Box<ManagerPeerEntry>,
92    },
93    /// Client authentication using cluster keys
94    ClientAuth {
95        token: String,
96    },
97    /// Client command request for distributed execution
98    ClientCommandRequest {
99        request_id: String,
100        command: String,
101        args: Vec<String>,
102        target: String, // Serialized CommandTarget
103        timeout_secs: u64,
104        options: String, // Serialized CommandOptions
105    },
106    /// Client command response
107    ClientCommandResponse {
108        request_id: String,
109        worker_id: String,
110        worker_name: Option<String>,
111        success: bool,
112        duration_millis: u64,
113        output: String,
114    },
115    /// Client streaming: header frame (manager -> client)
116    ClientCommandHeader {
117        request_id: String,
118        worker_id: String,
119        worker_name: Option<String>,
120        payload: Vec<u8>,
121    },
122    /// Client streaming: progress frame (manager -> client)
123    ClientCommandStream {
124        request_id: String,
125        worker_id: String,
126        worker_name: Option<String>,
127        payload: Vec<u8>,
128    },
129    /// Client streaming: footer/completion frame per worker (manager -> client)
130    ClientCommandFooter {
131        request_id: String,
132        worker_id: String,
133        worker_name: Option<String>,
134        payload: Vec<u8>,
135        duration_millis: u64,
136        success: bool,
137    },
138    /// Client command completion
139    ClientCommandComplete {
140        request_id: String,
141        total_workers: u32,
142    },
143    /// Client command error
144    ClientCommandError {
145        request_id: String,
146        error: String,
147    },
148    /// Client command cancel request
149    ClientCommandCancel {
150        request_id: String,
151    },
152    /// Worker command request (manager -> worker)
153    WorkerCommandRequest {
154        request_id: String,
155        command: String,
156        args: Vec<String>,
157        timeout_secs: u64,
158        options: String, // Serialized CommandOptions
159    },
160    /// Worker command response with results (worker -> manager)
161    WorkerCommandResponse {
162        request_id: String,
163        worker_id: String,
164        success: bool,
165        duration_millis: u64,
166        output: String,
167    },
168    /// Worker streaming header (binary payload of CommandHeader)
169    WorkerCommandHeader {
170        request_id: String,
171        worker_id: String,
172        payload: Vec<u8>,
173    },
174    /// Worker streaming frame (binary payload of CommandStream)
175    WorkerCommandStream {
176        request_id: String,
177        worker_id: String,
178        payload: Vec<u8>,
179    },
180    /// Worker streaming footer (binary payload of CommandFooter)
181    WorkerCommandFooter {
182        request_id: String,
183        worker_id: String,
184        payload: Vec<u8>,
185        duration_millis: u64,
186        success: bool,
187    },
188    /// Worker command error (worker -> manager)
189    WorkerCommandError {
190        request_id: String,
191        error: String,
192    },
193    /// Cancel a running command on a worker (manager -> worker)
194    WorkerCommandCancel {
195        request_id: String,
196    },
197    /// Manager heartbeat and peer list gossip.
198    ///
199    /// `version` is a monotonically increasing counter for the sender's
200    /// peer table. `peers` contains the full list of known peers when
201    /// `version` has advanced since the last heartbeat; otherwise it may
202    /// be empty to indicate no changes.
203    Announce {
204        meta: Box<ManagerPeerEntry>,
205        version: u64,
206        peers: Vec<ManagerPeerEntry>,
207        /// List of known workers when `version` has advanced. Empty otherwise.
208        #[serde(default)]
209        workers: Vec<WorkerEntry>,
210    },
211    /// Worker indicates it is intentionally closing this session (e.g., after migration)
212    Goodbye,
213}
214
215#[cfg(test)]
216mod message_tests {
217    use super::*;
218    use crate::codec::Codec;
219
220    #[test]
221    fn bincode_announce_roundtrip() {
222        let meta = ManagerPeerEntry {
223            manager_id: "m1".into(),
224            manager_name: "m1".into(),
225            tenant: "t".into(),
226            cluster: "c".into(),
227            host: "h1".into(),
228            tcp_port: 1,
229            quic_port: 1,
230            pub_fp: String::new(),
231            csk_ver: 0,
232            tls_cert: "cert1".into(),
233            tls_fp: "fp1".into(),
234            health: None,
235        };
236        let extra = ManagerPeerEntry {
237            host: "h2".into(),
238            tcp_port: 2,
239            quic_port: 2,
240            tls_cert: "cert2".into(),
241            tls_fp: "fp2".into(),
242            manager_id: "m2".into(),
243            manager_name: "m2".into(),
244            tenant: "t".into(),
245            cluster: "c".into(),
246            pub_fp: String::new(),
247            csk_ver: 0,
248            health: None,
249        };
250        let msg = Message::Announce {
251            meta: Box::new(meta),
252            version: 1,
253            peers: vec![extra],
254            workers: Vec::new(),
255        };
256        let bytes = crate::codec::JsonCodec::encode(&msg);
257        let decoded = crate::codec::JsonCodec::decode(&bytes).unwrap();
258        match decoded {
259            Message::Announce { version, peers, .. } => {
260                assert_eq!(version, 1);
261                assert_eq!(peers.len(), 1);
262            }
263            other => panic!("unexpected: {:?}", other),
264        }
265    }
266}
267
268#[cfg(test)]
269mod bincode_smoke {
270    use super::*;
271    use crate::codec::Codec;
272
273    #[test]
274    fn manager_peer_entry_bincode_roundtrip() {
275        let e = ManagerPeerEntry {
276            manager_id: "m1".into(),
277            manager_name: "m1".into(),
278            tenant: "t".into(),
279            cluster: "c".into(),
280            host: "h1".into(),
281            tcp_port: 1,
282            quic_port: 1,
283            pub_fp: String::new(),
284            csk_ver: 0,
285            tls_cert: "cert1".into(),
286            tls_fp: "fp1".into(),
287            health: None,
288        };
289        let msg = Message::Announce {
290            meta: Box::new(e.clone()),
291            version: 0,
292            peers: vec![e.clone()],
293            workers: Vec::new(),
294        };
295        let bytes = crate::codec::BincodeCodec::encode(&msg);
296        let decoded = crate::codec::BincodeCodec::decode(&bytes).unwrap();
297        match decoded {
298            Message::Announce { version, peers, .. } => {
299                assert_eq!(version, 0);
300                assert_eq!(peers.len(), 1);
301            }
302            other => panic!("unexpected: {:?}", other),
303        }
304    }
305
306    #[derive(Debug, Serialize, Deserialize, PartialEq)]
307    struct Wrapper {
308        meta: ManagerPeerEntry,
309        peers: Vec<ManagerPeerEntry>,
310    }
311
312    #[test]
313    fn wrapper_bincode_roundtrip() {
314        let e = ManagerPeerEntry {
315            manager_id: "m1".into(),
316            manager_name: "m1".into(),
317            tenant: "t".into(),
318            cluster: "c".into(),
319            host: "h1".into(),
320            tcp_port: 1,
321            quic_port: 1,
322            pub_fp: String::new(),
323            csk_ver: 0,
324            tls_cert: "cert1".into(),
325            tls_fp: "fp1".into(),
326            health: None,
327        };
328        let w = Wrapper {
329            meta: e.clone(),
330            peers: vec![e.clone()],
331        };
332        let bytes = crate::codec::BincodeCodec::encode(&Message::Ping { version: 42 });
333        let _ = bytes.len();
334        // Also ensure serde serialization of Wrapper works via serde_json
335        let json = serde_json::to_string(&w).unwrap();
336        let w2: Wrapper = serde_json::from_str(&json).unwrap();
337        assert_eq!(w.peers.len(), w2.peers.len());
338    }
339
340    #[test]
341    fn peer_entry_alone_bincode_roundtrip() {
342        let e = ManagerPeerEntry {
343            manager_id: "m1".into(),
344            manager_name: "m1".into(),
345            tenant: "t".into(),
346            cluster: "c".into(),
347            host: "h1".into(),
348            tcp_port: 1,
349            quic_port: 1,
350            pub_fp: String::new(),
351            csk_ver: 0,
352            tls_cert: "cert1".into(),
353            tls_fp: "fp1".into(),
354            health: None,
355        };
356        let bytes = crate::codec::BincodeCodec::encode(&Message::Hello {
357            manager_id: "x".into(),
358            nonce: [0u8; 32],
359            sig: vec![],
360        });
361        let _ = bytes.len();
362        let v = vec![e.clone(), e];
363        // Serialize vec<ManagerPeerEntry> with serde bincode directly to confirm
364        let bv = bincode::serialize(&v).unwrap();
365        let v2: Vec<ManagerPeerEntry> = bincode::deserialize(&bv).unwrap();
366        assert_eq!(v2.len(), 2);
367    }
368
369    #[test]
370    fn sanity_bincode_vec_string() {
371        let v = vec!["a".to_string(), "b".to_string()];
372        let bytes = bincode::serialize(&v).unwrap();
373        let vv: Vec<String> = bincode::deserialize(&bytes).unwrap();
374        assert_eq!(vv.len(), 2);
375    }
376}
377
378/// Metadata advertised by managers in heartbeats and peer storage
379#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
380pub struct ManagerPeerEntry {
381    pub manager_id: String,
382    pub manager_name: String,
383    pub tenant: String,
384    pub cluster: String,
385    pub host: String,
386    pub tcp_port: u16,
387    pub quic_port: u16,
388    pub pub_fp: String,
389    pub csk_ver: u32,
390    pub tls_cert: String,
391    pub tls_fp: String,
392
393    // Optional health metrics; always serialized for bincode compatibility
394    #[serde(default)]
395    pub health: Option<HealthMetrics>,
396}
397
398/// Health and load metrics for manager nodes
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
400pub struct HealthMetrics {
401    pub health_score: f32,        // 0.0-1.0, higher is better
402    pub load_percentage: f32,     // 0.0-100.0, current utilization
403    pub max_workers: Option<u32>, // None = unlimited
404    pub current_workers: u32,     // Current worker count
405    pub avg_cpu: Option<f32>,     // Optional system metrics
406    pub avg_memory: Option<f32>,  // Optional system metrics
407    pub last_health_update: u64,  // Unix timestamp
408}
409
410impl Default for HealthMetrics {
411    fn default() -> Self {
412        Self {
413            health_score: 1.0, // Start with perfect health
414            load_percentage: 0.0,
415            max_workers: None, // Unlimited by default
416            current_workers: 0,
417            avg_cpu: None,
418            avg_memory: None,
419            last_health_update: 0,
420        }
421    }
422}
423
424impl ManagerPeerEntry {
425    /// Calculate load factor based on reported load percentage (0.0 ..= 1.0)
426    pub fn calculate_load_factor(&self) -> f32 {
427        match &self.health {
428            Some(health) => (health.load_percentage / 100.0).clamp(0.0, 1.0),
429            None => 0.0,
430        }
431    }
432
433    /// Calculate weighted score combining health_score, load_percentage, and RTT
434    pub fn weighted_score(&self, rtt_ms: Option<f64>) -> f64 {
435        match &self.health {
436            Some(health) => {
437                let health_factor = health.health_score as f64;
438                // Use reported load percentage rather than worker counts to avoid leaking worker totals
439                let load_factor = 1.0 - self.calculate_load_factor() as f64;
440                let rtt_factor = rtt_ms.map(|rtt| 1.0 / (1.0 + rtt / 100.0)).unwrap_or(1.0);
441
442                // Weighted combination: 40% health, 40% load, 20% RTT
443                0.4 * health_factor + 0.4 * load_factor + 0.2 * rtt_factor
444            }
445            None => 0.5, // Default score when no health data available
446        }
447    }
448}
449
450#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
451pub enum ConnectionState {
452    #[default]
453    Inactive,
454    Client,
455    Server,
456}
457
458/// Worker presence advertised across the mesh.
459#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
460pub struct WorkerEntry {
461    pub worker_id: String,
462    pub manager_id: String,
463    #[serde(default)]
464    pub worker_name: Option<String>,
465    #[serde(default)]
466    pub last_seen: Option<u64>,
467    #[serde(default)]
468    pub connected_since: Option<u64>,
469    #[serde(default)]
470    pub disconnected_at: Option<u64>,
471}
472
473#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
474pub struct AliveEntry {
475    pub meta: ManagerPeerEntry,
476    pub last_seen: u64,
477}
478
479#[derive(Debug, Serialize, Deserialize)]
480pub struct PingRequest {
481    pub host: String,
482}
483
484#[derive(Debug, Serialize, Deserialize)]
485pub struct PingResponse {
486    pub success: bool,
487    pub latency_ms: u32,
488}