#![cfg_attr(test, allow(unused_crate_dependencies))]
use crate::env_config::config_dir_env;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
pub fn config_dir() -> PathBuf {
if let Some(dir) = config_dir_env() {
PathBuf::from(dir)
} else if let Some(mut path) = dirs_next::config_dir() {
path.push("volli");
path
} else {
dirs_next::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".volli")
}
}
pub const DEFAULT_TCP_PORT: u16 = 4242;
pub const DEFAULT_QUIC_PORT: u16 = 4242;
pub mod codec;
pub mod env_config;
pub mod handshake;
pub mod namegen;
pub mod peer_db;
pub mod peer_store;
pub mod profile;
pub mod token;
pub mod util;
pub mod worker;
pub use env_config::{
ConfigDirGuard, ConfigGuard, EnvironmentConfig, env_config, override_config_dir,
override_env_config, override_env_config_patch,
};
pub use worker::{Protocol, Role, WorkerConfig};
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ping {
version: u64,
},
Pong {
mac: String,
version: u64,
},
Auth {
token: String,
#[serde(default)]
worker_id: Option<String>,
#[serde(default)]
worker_name: Option<String>,
},
Join {
token: String,
},
AuthOk,
AuthErr,
TokenRefreshRequest {
token: String,
},
TokenRefreshOk {
token: String,
},
TokenRefreshErr {
reason: String,
},
Hello {
manager_id: String,
nonce: [u8; 32],
sig: Vec<u8>,
},
Welcome {
manager_id: String,
nonce: [u8; 32],
sig: Vec<u8>,
},
ClusterKey {
ver: u32,
csk: [u8; 32],
},
JoinResponse {
ver: u32,
csk: [u8; 32],
peer: Box<ManagerPeerEntry>,
},
ClientAuth {
token: String,
},
ClientCommandRequest {
request_id: String,
command: String,
args: Vec<String>,
target: String, timeout_secs: u64,
options: String, },
ClientCommandResponse {
request_id: String,
worker_id: String,
worker_name: Option<String>,
success: bool,
duration_millis: u64,
output: String,
},
ClientCommandHeader {
request_id: String,
worker_id: String,
worker_name: Option<String>,
payload: Vec<u8>,
},
ClientCommandStream {
request_id: String,
worker_id: String,
worker_name: Option<String>,
payload: Vec<u8>,
},
ClientCommandFooter {
request_id: String,
worker_id: String,
worker_name: Option<String>,
payload: Vec<u8>,
duration_millis: u64,
success: bool,
},
ClientCommandComplete {
request_id: String,
total_workers: u32,
},
ClientCommandError {
request_id: String,
error: String,
},
ClientCommandCancel {
request_id: String,
},
WorkerCommandRequest {
request_id: String,
command: String,
args: Vec<String>,
timeout_secs: u64,
options: String, },
WorkerCommandResponse {
request_id: String,
worker_id: String,
success: bool,
duration_millis: u64,
output: String,
},
WorkerCommandHeader {
request_id: String,
worker_id: String,
payload: Vec<u8>,
},
WorkerCommandStream {
request_id: String,
worker_id: String,
payload: Vec<u8>,
},
WorkerCommandFooter {
request_id: String,
worker_id: String,
payload: Vec<u8>,
duration_millis: u64,
success: bool,
},
WorkerCommandError {
request_id: String,
error: String,
},
WorkerCommandCancel {
request_id: String,
},
Announce {
meta: Box<ManagerPeerEntry>,
version: u64,
peers: Vec<ManagerPeerEntry>,
#[serde(default)]
workers: Vec<WorkerEntry>,
},
Goodbye,
}
#[cfg(test)]
mod message_tests {
use super::*;
use crate::codec::Codec;
#[test]
fn bincode_announce_roundtrip() {
let meta = ManagerPeerEntry {
manager_id: "m1".into(),
manager_name: "m1".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "h1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: String::new(),
csk_ver: 0,
tls_cert: "cert1".into(),
tls_fp: "fp1".into(),
health: None,
};
let extra = ManagerPeerEntry {
host: "h2".into(),
tcp_port: 2,
quic_port: 2,
tls_cert: "cert2".into(),
tls_fp: "fp2".into(),
manager_id: "m2".into(),
manager_name: "m2".into(),
tenant: "t".into(),
cluster: "c".into(),
pub_fp: String::new(),
csk_ver: 0,
health: None,
};
let msg = Message::Announce {
meta: Box::new(meta),
version: 1,
peers: vec![extra],
workers: Vec::new(),
};
let bytes = crate::codec::JsonCodec::encode(&msg);
let decoded = crate::codec::JsonCodec::decode(&bytes).unwrap();
match decoded {
Message::Announce { version, peers, .. } => {
assert_eq!(version, 1);
assert_eq!(peers.len(), 1);
}
other => panic!("unexpected: {:?}", other),
}
}
}
#[cfg(test)]
mod bincode_smoke {
use super::*;
use crate::codec::Codec;
#[test]
fn manager_peer_entry_bincode_roundtrip() {
let e = ManagerPeerEntry {
manager_id: "m1".into(),
manager_name: "m1".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "h1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: String::new(),
csk_ver: 0,
tls_cert: "cert1".into(),
tls_fp: "fp1".into(),
health: None,
};
let msg = Message::Announce {
meta: Box::new(e.clone()),
version: 0,
peers: vec![e.clone()],
workers: Vec::new(),
};
let bytes = crate::codec::BincodeCodec::encode(&msg);
let decoded = crate::codec::BincodeCodec::decode(&bytes).unwrap();
match decoded {
Message::Announce { version, peers, .. } => {
assert_eq!(version, 0);
assert_eq!(peers.len(), 1);
}
other => panic!("unexpected: {:?}", other),
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Wrapper {
meta: ManagerPeerEntry,
peers: Vec<ManagerPeerEntry>,
}
#[test]
fn wrapper_bincode_roundtrip() {
let e = ManagerPeerEntry {
manager_id: "m1".into(),
manager_name: "m1".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "h1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: String::new(),
csk_ver: 0,
tls_cert: "cert1".into(),
tls_fp: "fp1".into(),
health: None,
};
let w = Wrapper {
meta: e.clone(),
peers: vec![e.clone()],
};
let bytes = crate::codec::BincodeCodec::encode(&Message::Ping { version: 42 });
let _ = bytes.len();
let json = serde_json::to_string(&w).unwrap();
let w2: Wrapper = serde_json::from_str(&json).unwrap();
assert_eq!(w.peers.len(), w2.peers.len());
}
#[test]
fn peer_entry_alone_bincode_roundtrip() {
let e = ManagerPeerEntry {
manager_id: "m1".into(),
manager_name: "m1".into(),
tenant: "t".into(),
cluster: "c".into(),
host: "h1".into(),
tcp_port: 1,
quic_port: 1,
pub_fp: String::new(),
csk_ver: 0,
tls_cert: "cert1".into(),
tls_fp: "fp1".into(),
health: None,
};
let bytes = crate::codec::BincodeCodec::encode(&Message::Hello {
manager_id: "x".into(),
nonce: [0u8; 32],
sig: vec![],
});
let _ = bytes.len();
let v = vec![e.clone(), e];
let bv = bincode::serialize(&v).unwrap();
let v2: Vec<ManagerPeerEntry> = bincode::deserialize(&bv).unwrap();
assert_eq!(v2.len(), 2);
}
#[test]
fn sanity_bincode_vec_string() {
let v = vec!["a".to_string(), "b".to_string()];
let bytes = bincode::serialize(&v).unwrap();
let vv: Vec<String> = bincode::deserialize(&bytes).unwrap();
assert_eq!(vv.len(), 2);
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct ManagerPeerEntry {
pub manager_id: String,
pub manager_name: String,
pub tenant: String,
pub cluster: String,
pub host: String,
pub tcp_port: u16,
pub quic_port: u16,
pub pub_fp: String,
pub csk_ver: u32,
pub tls_cert: String,
pub tls_fp: String,
#[serde(default)]
pub health: Option<HealthMetrics>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct HealthMetrics {
pub health_score: f32, pub load_percentage: f32, pub max_workers: Option<u32>, pub current_workers: u32, pub avg_cpu: Option<f32>, pub avg_memory: Option<f32>, pub last_health_update: u64, }
impl Default for HealthMetrics {
fn default() -> Self {
Self {
health_score: 1.0, load_percentage: 0.0,
max_workers: None, current_workers: 0,
avg_cpu: None,
avg_memory: None,
last_health_update: 0,
}
}
}
impl ManagerPeerEntry {
pub fn calculate_load_factor(&self) -> f32 {
match &self.health {
Some(health) => (health.load_percentage / 100.0).clamp(0.0, 1.0),
None => 0.0,
}
}
pub fn weighted_score(&self, rtt_ms: Option<f64>) -> f64 {
match &self.health {
Some(health) => {
let health_factor = health.health_score as f64;
let load_factor = 1.0 - self.calculate_load_factor() as f64;
let rtt_factor = rtt_ms.map(|rtt| 1.0 / (1.0 + rtt / 100.0)).unwrap_or(1.0);
0.4 * health_factor + 0.4 * load_factor + 0.2 * rtt_factor
}
None => 0.5, }
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum ConnectionState {
#[default]
Inactive,
Client,
Server,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct WorkerEntry {
pub worker_id: String,
pub manager_id: String,
#[serde(default)]
pub worker_name: Option<String>,
#[serde(default)]
pub last_seen: Option<u64>,
#[serde(default)]
pub connected_since: Option<u64>,
#[serde(default)]
pub disconnected_at: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AliveEntry {
pub meta: ManagerPeerEntry,
pub last_seen: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PingRequest {
pub host: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PingResponse {
pub success: bool,
pub latency_ms: u32,
}