use crate::{CompilationKind, toolchain::ToolchainInfo};
use rand::RngExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
pub struct WorkerId(pub String);
impl WorkerId {
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for WorkerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum WorkerStatus {
#[default]
Healthy,
Degraded,
Unreachable,
Draining,
Drained,
Disabled,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum CircuitState {
#[default]
Closed,
Open,
HalfOpen,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum RequiredRuntime {
#[default]
None,
Rust,
Bun,
Node,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum CommandPriority {
Low,
#[default]
Normal,
High,
}
impl std::fmt::Display for CommandPriority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let value = match self {
Self::Low => "low",
Self::Normal => "normal",
Self::High => "high",
};
write!(f, "{}", value)
}
}
impl std::str::FromStr for CommandPriority {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"low" => Ok(Self::Low),
"normal" => Ok(Self::Normal),
"high" => Ok(Self::High),
_ => Err(()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionRequest {
pub project: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(default)]
pub command_priority: CommandPriority,
pub estimated_cores: u32,
#[serde(default)]
pub preferred_workers: Vec<WorkerId>,
#[serde(default)]
pub toolchain: Option<ToolchainInfo>,
#[serde(default)]
pub required_runtime: RequiredRuntime,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub classification_duration_us: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub hook_pid: Option<u32>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum SelectionReason {
Success,
NoWorkersConfigured,
AllWorkersUnreachable,
AllCircuitsOpen,
AllWorkersBusy,
NoWorkersPassedHealth,
AllWorkersFailedPreflight,
AllWorkersFailedConvergence,
NoAdmissibleWorkers(String),
NoMatchingWorkers,
NoWorkersWithRuntime(String),
SelectionError(String),
AffinityPinned,
AffinityFallback,
}
impl std::fmt::Display for SelectionReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Success => write!(f, "worker assigned successfully"),
Self::NoWorkersConfigured => write!(f, "no workers configured"),
Self::AllWorkersUnreachable => write!(f, "all workers unreachable"),
Self::AllCircuitsOpen => write!(f, "all worker circuits open"),
Self::AllWorkersBusy => write!(f, "all workers at capacity"),
Self::NoWorkersPassedHealth => write!(f, "no workers passed health thresholds"),
Self::AllWorkersFailedPreflight => write!(f, "all workers failed preflight checks"),
Self::AllWorkersFailedConvergence => {
write!(f, "all workers failed repo convergence checks")
}
Self::NoAdmissibleWorkers(summary) => {
write!(f, "no admissible workers: {}", summary)
}
Self::NoMatchingWorkers => write!(f, "no matching workers found"),
Self::NoWorkersWithRuntime(rt) => write!(f, "no workers with {} installed", rt),
Self::SelectionError(e) => write!(f, "selection error: {}", e),
Self::AffinityPinned => write!(f, "worker assigned via affinity pinning"),
Self::AffinityFallback => write!(f, "worker assigned via last-success fallback"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum SelectionStrategy {
Priority,
Fastest,
#[default]
Balanced,
CacheAffinity,
FairFastest,
}
impl std::fmt::Display for SelectionStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = match self {
Self::Priority => "priority",
Self::Fastest => "fastest",
Self::Balanced => "balanced",
Self::CacheAffinity => "cache_affinity",
Self::FairFastest => "fair_fastest",
};
write!(f, "{}", name)
}
}
impl std::str::FromStr for SelectionStrategy {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"priority" => Ok(Self::Priority),
"fastest" => Ok(Self::Fastest),
"balanced" => Ok(Self::Balanced),
"cache_affinity" | "cache-affinity" | "cacheaffinity" => Ok(Self::CacheAffinity),
"fair_fastest" | "fair-fastest" | "fairfastest" => Ok(Self::FairFastest),
_ => Err(format!(
"unknown selection strategy '{}', expected one of: priority, fastest, balanced, cache_affinity, fair_fastest",
s
)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionConfig {
#[serde(default)]
pub strategy: SelectionStrategy,
#[serde(default = "default_min_success_rate")]
pub min_success_rate: f64,
#[serde(default)]
pub weights: SelectionWeightConfig,
#[serde(default)]
pub fairness: FairnessConfig,
#[serde(default)]
pub affinity: AffinityConfig,
#[serde(default = "default_max_load_per_core")]
pub max_load_per_core: Option<f64>,
#[serde(default = "default_min_free_gb")]
pub min_free_gb: Option<f64>,
}
impl Default for SelectionConfig {
fn default() -> Self {
Self {
strategy: SelectionStrategy::default(),
min_success_rate: default_min_success_rate(),
weights: SelectionWeightConfig::default(),
fairness: FairnessConfig::default(),
affinity: AffinityConfig::default(),
max_load_per_core: default_max_load_per_core(),
min_free_gb: default_min_free_gb(),
}
}
}
fn default_min_success_rate() -> f64 {
0.8
}
fn default_max_load_per_core() -> Option<f64> {
Some(2.0) }
fn default_min_free_gb() -> Option<f64> {
Some(10.0) }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionWeightConfig {
#[serde(default = "default_weight_speedscore")]
pub speedscore: f64,
#[serde(default = "default_weight_slots")]
pub slots: f64,
#[serde(default = "default_weight_health")]
pub health: f64,
#[serde(default = "default_weight_cache")]
pub cache: f64,
#[serde(default = "default_weight_network")]
pub network: f64,
#[serde(default = "default_weight_priority")]
pub priority: f64,
#[serde(default = "default_half_open_penalty")]
pub half_open_penalty: f64,
}
impl Default for SelectionWeightConfig {
fn default() -> Self {
Self {
speedscore: default_weight_speedscore(),
slots: default_weight_slots(),
health: default_weight_health(),
cache: default_weight_cache(),
network: default_weight_network(),
priority: default_weight_priority(),
half_open_penalty: default_half_open_penalty(),
}
}
}
fn default_weight_speedscore() -> f64 {
0.5
}
fn default_weight_slots() -> f64 {
0.4
}
fn default_weight_health() -> f64 {
0.3
}
fn default_weight_cache() -> f64 {
0.2
}
fn default_weight_network() -> f64 {
0.1
}
fn default_weight_priority() -> f64 {
0.1
}
fn default_half_open_penalty() -> f64 {
0.5
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FairnessConfig {
#[serde(default = "default_fairness_lookback_secs")]
pub lookback_secs: u64,
#[serde(default = "default_max_consecutive_selections")]
pub max_consecutive_selections: u32,
}
impl Default for FairnessConfig {
fn default() -> Self {
Self {
lookback_secs: default_fairness_lookback_secs(),
max_consecutive_selections: default_max_consecutive_selections(),
}
}
}
fn default_fairness_lookback_secs() -> u64 {
300 }
fn default_max_consecutive_selections() -> u32 {
3
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AffinityConfig {
#[serde(default = "default_affinity_enabled")]
pub enabled: bool,
#[serde(default = "default_affinity_pin_minutes")]
pub pin_minutes: u64,
#[serde(default = "default_last_success_fallback")]
pub enable_last_success_fallback: bool,
#[serde(default = "default_fallback_min_success_rate")]
pub fallback_min_success_rate: f64,
}
impl Default for AffinityConfig {
fn default() -> Self {
Self {
enabled: default_affinity_enabled(),
pin_minutes: default_affinity_pin_minutes(),
enable_last_success_fallback: default_last_success_fallback(),
fallback_min_success_rate: default_fallback_min_success_rate(),
}
}
}
fn default_affinity_enabled() -> bool {
true
}
fn default_affinity_pin_minutes() -> u64 {
60 }
fn default_last_success_fallback() -> bool {
true
}
fn default_fallback_min_success_rate() -> f64 {
0.5 }
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct SelectedWorker {
pub id: WorkerId,
pub host: String,
pub user: String,
pub identity_file: String,
pub slots_available: u32,
pub speed_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionResponse {
pub worker: Option<SelectedWorker>,
pub reason: SelectionReason,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build_id: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReleaseRequest {
pub worker_id: WorkerId,
pub slots: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bytes_transferred: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timing: Option<CommandTimingBreakdown>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum BuildHeartbeatPhase {
SyncUp,
Execute,
SyncDown,
Finalize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct BuildHeartbeatRequest {
pub build_id: u64,
pub worker_id: WorkerId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub hook_pid: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub remote_pgid_file: Option<String>,
pub phase: BuildHeartbeatPhase,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub progress_counter: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub progress_percent: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
pub id: WorkerId,
pub host: String,
pub user: String,
pub identity_file: String,
pub total_slots: u32,
#[serde(default = "default_priority")]
pub priority: u32,
#[serde(default)]
pub tags: Vec<String>,
}
fn default_priority() -> u32 {
100
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
id: WorkerId::new("default-worker"),
host: "localhost".to_string(),
user: "user".to_string(),
identity_file: "~/.ssh/id_rsa".to_string(),
total_slots: 4,
priority: default_priority(),
tags: Vec::new(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
pub struct WorkerCapabilities {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rustc_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bun_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub node_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub npm_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub num_cpus: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub load_avg_1: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub load_avg_5: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub load_avg_15: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub disk_free_gb: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub disk_total_gb: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub projects_root_ok: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub projects_root_issue: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub projects_root_checked_at_unix_ms: Option<i64>,
}
impl WorkerCapabilities {
pub fn new() -> Self {
Self::default()
}
pub fn mock_with_rust() -> Self {
Self {
rustc_version: Some("1.85.0-nightly (mock)".to_string()),
projects_root_ok: Some(true),
..Self::default()
}
}
pub fn has_bun(&self) -> bool {
self.bun_version.is_some()
}
pub fn has_node(&self) -> bool {
self.node_version.is_some()
}
pub fn has_rust(&self) -> bool {
self.rustc_version.is_some()
}
pub fn load_per_core(&self) -> Option<f64> {
match (self.load_avg_1, self.num_cpus) {
(Some(load), Some(cpus)) if cpus > 0 => Some(load / cpus as f64),
_ => None,
}
}
pub fn is_high_load(&self, max_load_per_core: f64) -> Option<bool> {
self.load_per_core().map(|lpc| lpc > max_load_per_core)
}
pub fn is_low_disk(&self, min_free_gb: f64) -> Option<bool> {
self.disk_free_gb.map(|free| free < min_free_gb)
}
pub fn is_topology_healthy(&self) -> Option<bool> {
self.projects_root_ok
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PathTopologyConfig {
#[serde(default)]
pub canonical_root: Option<String>,
#[serde(default)]
pub alias_root: Option<String>,
}
impl PathTopologyConfig {
pub fn to_policy(&self) -> crate::path_topology::PathTopologyPolicy {
let canonical = self
.canonical_root
.as_deref()
.filter(|s| !s.is_empty())
.map(|s| shellexpand::tilde(s).into_owned())
.unwrap_or_else(|| crate::path_topology::DEFAULT_CANONICAL_PROJECT_ROOT.to_string());
let alias = self
.alias_root
.as_deref()
.filter(|s| !s.is_empty())
.map(|s| shellexpand::tilde(s).into_owned())
.unwrap_or_else(|| crate::path_topology::DEFAULT_ALIAS_PROJECT_ROOT.to_string());
crate::path_topology::PathTopologyPolicy::new(
PathBuf::from(canonical),
PathBuf::from(alias),
)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RchConfig {
#[serde(default)]
pub general: GeneralConfig,
#[serde(default)]
pub compilation: CompilationConfig,
#[serde(default)]
pub transfer: TransferConfig,
#[serde(default)]
pub environment: EnvironmentConfig,
#[serde(default)]
pub circuit: CircuitBreakerConfig,
#[serde(default)]
pub output: OutputConfig,
#[serde(default)]
pub self_healing: SelfHealingConfig,
#[serde(default)]
pub self_test: SelfTestConfig,
#[serde(default)]
pub selection: SelectionConfig,
#[serde(default)]
pub execution: ExecutionConfig,
#[serde(default)]
pub alerts: AlertsConfig,
#[serde(default)]
pub fleet: FleetConfig,
#[serde(default)]
pub path_topology: PathTopologyConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertsConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_alert_suppress_duplicates_secs")]
pub suppress_duplicates_secs: u64,
#[serde(default = "default_alert_cleared_retention_secs")]
pub cleared_retention_secs: u64,
#[serde(default)]
pub webhook: Option<WebhookConfig>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: Option<String>,
#[serde(default)]
pub secret: Option<String>,
#[serde(default = "default_webhook_timeout_secs")]
pub timeout_secs: u64,
#[serde(default = "default_webhook_retry_count")]
pub retry_count: u32,
#[serde(default)]
pub events: Vec<String>,
}
fn default_webhook_timeout_secs() -> u64 {
5
}
fn default_webhook_retry_count() -> u32 {
3
}
fn default_alert_suppress_duplicates_secs() -> u64 {
300
}
fn default_alert_cleared_retention_secs() -> u64 {
300
}
impl Default for AlertsConfig {
fn default() -> Self {
Self {
enabled: true,
suppress_duplicates_secs: default_alert_suppress_duplicates_secs(),
cleared_retention_secs: default_alert_cleared_retention_secs(),
webhook: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FleetConfig {
#[serde(default = "default_fleet_ssh_connect_timeout_secs")]
pub ssh_connect_timeout_secs: u64,
#[serde(default = "default_fleet_ssh_command_timeout_secs")]
pub ssh_command_timeout_secs: u64,
#[serde(default = "default_fleet_min_disk_space_mb")]
pub min_disk_space_mb: u64,
#[serde(default = "default_fleet_max_load_average")]
pub max_load_average: f64,
#[serde(default = "default_fleet_max_concurrent_workers")]
pub max_concurrent_workers: usize,
#[serde(default = "default_fleet_retry_count")]
pub retry_count: u32,
#[serde(default = "default_fleet_retry_delay_ms")]
pub retry_delay_ms: u64,
}
fn default_fleet_ssh_connect_timeout_secs() -> u64 {
10
}
fn default_fleet_ssh_command_timeout_secs() -> u64 {
30
}
fn default_fleet_min_disk_space_mb() -> u64 {
500
}
fn default_fleet_max_load_average() -> f64 {
10.0
}
fn default_fleet_max_concurrent_workers() -> usize {
10
}
fn default_fleet_retry_count() -> u32 {
2
}
fn default_fleet_retry_delay_ms() -> u64 {
1000
}
impl Default for FleetConfig {
fn default() -> Self {
Self {
ssh_connect_timeout_secs: default_fleet_ssh_connect_timeout_secs(),
ssh_command_timeout_secs: default_fleet_ssh_command_timeout_secs(),
min_disk_space_mb: default_fleet_min_disk_space_mb(),
max_load_average: default_fleet_max_load_average(),
max_concurrent_workers: default_fleet_max_concurrent_workers(),
retry_count: default_fleet_retry_count(),
retry_delay_ms: default_fleet_retry_delay_ms(),
}
}
}
impl FleetConfig {
pub fn ssh_connect_timeout(&self) -> std::time::Duration {
std::time::Duration::from_secs(self.ssh_connect_timeout_secs)
}
pub fn ssh_command_timeout(&self) -> std::time::Duration {
std::time::Duration::from_secs(self.ssh_command_timeout_secs)
}
pub fn retry_delay(&self) -> std::time::Duration {
std::time::Duration::from_millis(self.retry_delay_ms)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GeneralConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub force_local: bool,
#[serde(default)]
pub force_remote: bool,
#[serde(default = "default_log_level")]
pub log_level: String,
#[serde(default = "default_socket_path")]
pub socket_path: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EnvironmentConfig {
#[serde(default)]
pub allowlist: Vec<String>,
}
impl Default for GeneralConfig {
fn default() -> Self {
Self {
enabled: true,
force_local: false,
force_remote: false,
log_level: "info".to_string(),
socket_path: default_socket_path(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum OutputVisibility {
#[default]
None,
Summary,
Verbose,
}
impl std::fmt::Display for OutputVisibility {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let value = match self {
OutputVisibility::None => "none",
OutputVisibility::Summary => "summary",
OutputVisibility::Verbose => "verbose",
};
write!(f, "{}", value)
}
}
impl std::str::FromStr for OutputVisibility {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"none" | "silent" | "quiet" => Ok(Self::None),
"summary" | "short" => Ok(Self::Summary),
"verbose" | "debug" => Ok(Self::Verbose),
_ => Err(()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum ColorMode {
#[default]
Always,
Auto,
Never,
}
impl std::fmt::Display for ColorMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let value = match self {
ColorMode::Always => "always",
ColorMode::Auto => "auto",
ColorMode::Never => "never",
};
write!(f, "{}", value)
}
}
impl std::str::FromStr for ColorMode {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"always" | "force" | "yes" | "true" => Ok(Self::Always),
"auto" | "detect" => Ok(Self::Auto),
"never" | "none" | "no" | "false" => Ok(Self::Never),
_ => Err(()),
}
}
}
fn default_color_mode() -> ColorMode {
ColorMode::default()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutputConfig {
#[serde(default = "default_output_visibility")]
pub visibility: OutputVisibility,
#[serde(default)]
pub first_run_complete: bool,
#[serde(default = "default_color_mode")]
pub color_mode: ColorMode,
}
impl Default for OutputConfig {
fn default() -> Self {
Self {
visibility: OutputVisibility::None,
first_run_complete: false,
color_mode: ColorMode::default(),
}
}
}
fn default_autostart_cooldown_secs() -> u64 {
30
}
fn default_autostart_timeout_secs() -> u64 {
3
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum SelfHealingLogLevel {
Debug,
#[default]
Info,
Warn,
Error,
}
impl SelfHealingLogLevel {
pub fn from_env_str(s: &str) -> Option<Self> {
match s.trim().to_ascii_lowercase().as_str() {
"debug" => Some(Self::Debug),
"info" => Some(Self::Info),
"warn" | "warning" => Some(Self::Warn),
"error" => Some(Self::Error),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelfHealingConfig {
#[serde(default = "default_true")]
pub hook_starts_daemon: bool,
#[serde(default = "default_true")]
pub daemon_installs_hooks: bool,
#[serde(default = "default_autostart_cooldown_secs")]
pub auto_start_cooldown_secs: u64,
#[serde(
default = "default_autostart_timeout_secs",
alias = "daemon_start_timeout"
)]
pub auto_start_timeout_secs: u64,
#[serde(default)]
pub self_healing_log_level: SelfHealingLogLevel,
}
impl Default for SelfHealingConfig {
fn default() -> Self {
Self {
hook_starts_daemon: default_true(),
daemon_installs_hooks: default_true(),
auto_start_cooldown_secs: default_autostart_cooldown_secs(),
auto_start_timeout_secs: default_autostart_timeout_secs(),
self_healing_log_level: SelfHealingLogLevel::default(),
}
}
}
impl SelfHealingConfig {
pub fn with_env_overrides(mut self) -> Self {
if let Ok(val) = std::env::var("RCH_NO_SELF_HEALING")
&& (val == "1" || val.eq_ignore_ascii_case("true"))
{
self.hook_starts_daemon = false;
self.daemon_installs_hooks = false;
return self;
}
if let Ok(val) = std::env::var("RCH_HOOK_STARTS_DAEMON") {
self.hook_starts_daemon = val != "0" && !val.eq_ignore_ascii_case("false");
}
if let Ok(val) = std::env::var("RCH_DAEMON_INSTALLS_HOOKS") {
self.daemon_installs_hooks = val != "0" && !val.eq_ignore_ascii_case("false");
}
if let Ok(val) = std::env::var("RCH_AUTO_START_TIMEOUT_SECS")
&& let Ok(secs) = val.parse()
{
self.auto_start_timeout_secs = secs;
}
if let Ok(val) = std::env::var("RCH_AUTO_START_COOLDOWN_SECS")
&& let Ok(secs) = val.parse()
{
self.auto_start_cooldown_secs = secs;
}
if let Ok(val) = std::env::var("RCH_SELF_HEALING_LOG_LEVEL") {
if let Some(level) = SelfHealingLogLevel::from_env_str(&val) {
self.self_healing_log_level = level;
} else {
tracing::warn!(
target: "rch::self_healing",
invalid = %val,
"RCH_SELF_HEALING_LOG_LEVEL has invalid value; using default=info"
);
}
}
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum SelfTestFailureAction {
#[default]
Alert,
DisableWorker,
AlertAndDisable,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum SelfTestWorkers {
All(String),
List(Vec<WorkerId>),
}
impl Default for SelfTestWorkers {
fn default() -> Self {
Self::All("all".to_string())
}
}
impl SelfTestWorkers {
pub fn resolve(&self) -> Option<Vec<WorkerId>> {
match self {
SelfTestWorkers::All(value) => {
if value.eq_ignore_ascii_case("all") {
None
} else {
Some(vec![WorkerId::new(value.clone())])
}
}
SelfTestWorkers::List(list) => Some(list.clone()),
}
}
}
fn default_self_test_retry_count() -> u32 {
3
}
fn default_self_test_retry_delay() -> String {
"5m".to_string()
}
fn default_self_test_enabled() -> bool {
false
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelfTestConfig {
#[serde(default = "default_self_test_enabled")]
pub enabled: bool,
#[serde(default)]
pub schedule: Option<String>,
#[serde(default)]
pub interval: Option<String>,
#[serde(default)]
pub workers: SelfTestWorkers,
#[serde(default)]
pub on_failure: SelfTestFailureAction,
#[serde(default = "default_self_test_retry_count")]
pub retry_count: u32,
#[serde(default = "default_self_test_retry_delay")]
pub retry_delay: String,
}
impl Default for SelfTestConfig {
fn default() -> Self {
Self {
enabled: default_self_test_enabled(),
schedule: None,
interval: None,
workers: SelfTestWorkers::default(),
on_failure: SelfTestFailureAction::default(),
retry_count: default_self_test_retry_count(),
retry_delay: default_self_test_retry_delay(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircuitBreakerConfig {
#[serde(default = "default_circuit_failure_threshold")]
pub failure_threshold: u32,
#[serde(default = "default_circuit_success_threshold")]
pub success_threshold: u32,
#[serde(default = "default_circuit_error_rate_threshold")]
pub error_rate_threshold: f64,
#[serde(default = "default_circuit_window_secs")]
pub window_secs: u64,
#[serde(default = "default_circuit_open_cooldown_secs")]
pub open_cooldown_secs: u64,
#[serde(default = "default_circuit_half_open_max_probes")]
pub half_open_max_probes: u32,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: default_circuit_failure_threshold(),
success_threshold: default_circuit_success_threshold(),
error_rate_threshold: default_circuit_error_rate_threshold(),
window_secs: default_circuit_window_secs(),
open_cooldown_secs: default_circuit_open_cooldown_secs(),
half_open_max_probes: default_circuit_half_open_max_probes(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CircuitStats {
state: CircuitState,
consecutive_failures: u32,
consecutive_successes: u32,
window_successes: u32,
window_failures: u32,
opened_at: Option<u64>,
last_state_change: u64,
active_probes: u32,
recent_results: Vec<bool>,
}
const CIRCUIT_HISTORY_SIZE: usize = 10;
impl CircuitStats {
pub fn new() -> Self {
Self {
state: CircuitState::Closed,
consecutive_failures: 0,
consecutive_successes: 0,
window_successes: 0,
window_failures: 0,
opened_at: None,
last_state_change: Self::now_millis(),
active_probes: 0,
recent_results: Vec::with_capacity(CIRCUIT_HISTORY_SIZE),
}
}
pub fn state(&self) -> CircuitState {
self.state
}
pub fn opened_at(&self) -> Option<u64> {
self.opened_at
}
pub fn last_state_change(&self) -> u64 {
self.last_state_change
}
pub fn consecutive_failures(&self) -> u32 {
self.consecutive_failures
}
pub fn error_rate(&self) -> f64 {
let total = self.window_successes + self.window_failures;
if total == 0 {
return 0.0;
}
self.window_failures as f64 / total as f64
}
pub fn record_success(&mut self) {
self.window_successes += 1;
self.consecutive_failures = 0;
self.push_result(true);
if self.state == CircuitState::HalfOpen {
self.consecutive_successes += 1;
if self.active_probes > 0 {
self.active_probes -= 1;
}
}
}
pub fn record_failure(&mut self) {
self.window_failures += 1;
self.consecutive_failures += 1;
self.push_result(false);
if self.state == CircuitState::HalfOpen {
self.consecutive_successes = 0;
if self.active_probes > 0 {
self.active_probes -= 1;
}
}
}
pub fn should_open(&self, config: &CircuitBreakerConfig) -> bool {
if self.state != CircuitState::Closed {
return false;
}
if self.consecutive_failures >= config.failure_threshold {
return true;
}
let total = self.window_successes + self.window_failures;
if total >= 5 && self.error_rate() >= config.error_rate_threshold {
return true;
}
false
}
pub fn should_half_open(&self, config: &CircuitBreakerConfig) -> bool {
if self.state != CircuitState::Open {
return false;
}
let now = Self::now_millis();
let cooldown_ms = config.open_cooldown_secs * 1000;
if let Some(opened_at) = self.opened_at {
now.saturating_sub(opened_at) >= cooldown_ms
} else {
false
}
}
pub fn should_close(&self, config: &CircuitBreakerConfig) -> bool {
if self.state != CircuitState::HalfOpen {
return false;
}
self.consecutive_successes >= config.success_threshold
}
pub fn can_probe(&self, config: &CircuitBreakerConfig) -> bool {
if self.state != CircuitState::HalfOpen {
return false;
}
self.active_probes < config.half_open_max_probes
}
pub fn start_probe(&mut self, config: &CircuitBreakerConfig) -> bool {
if !self.can_probe(config) {
return false;
}
self.active_probes += 1;
true
}
pub fn open(&mut self) {
if self.state != CircuitState::Open {
self.state = CircuitState::Open;
self.opened_at = Some(Self::now_millis());
self.last_state_change = Self::now_millis();
self.consecutive_successes = 0;
self.active_probes = 0;
}
}
pub fn half_open(&mut self) {
if self.state != CircuitState::HalfOpen {
self.state = CircuitState::HalfOpen;
self.last_state_change = Self::now_millis();
self.consecutive_successes = 0;
self.active_probes = 0;
}
}
pub fn close(&mut self) {
if self.state != CircuitState::Closed {
self.state = CircuitState::Closed;
self.last_state_change = Self::now_millis();
self.opened_at = None;
self.consecutive_failures = 0;
self.consecutive_successes = 0;
self.active_probes = 0;
self.window_successes = 0;
self.window_failures = 0;
}
}
pub fn reset_window(&mut self) {
self.window_successes = 0;
self.window_failures = 0;
}
pub fn recent_results(&self) -> &[bool] {
&self.recent_results
}
pub fn recovery_remaining_secs(&self, config: &CircuitBreakerConfig) -> Option<u64> {
if self.state != CircuitState::Open {
return None;
}
let now = Self::now_millis();
let cooldown_ms = config.open_cooldown_secs * 1000;
if let Some(opened_at) = self.opened_at {
let elapsed_ms = now.saturating_sub(opened_at);
if elapsed_ms < cooldown_ms {
return Some((cooldown_ms - elapsed_ms) / 1000);
}
}
None
}
fn push_result(&mut self, success: bool) {
if self.recent_results.len() >= CIRCUIT_HISTORY_SIZE {
self.recent_results.remove(0);
}
self.recent_results.push(success);
}
fn now_millis() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompilationConfig {
#[serde(default = "default_confidence")]
pub confidence_threshold: f64,
#[serde(default = "default_min_local_time")]
pub min_local_time_ms: u64,
#[serde(default = "default_speedup_threshold")]
pub remote_speedup_threshold: f64,
#[serde(default = "default_build_slots")]
pub build_slots: u32,
#[serde(default = "default_test_slots")]
pub test_slots: u32,
#[serde(default = "default_check_slots")]
pub check_slots: u32,
#[serde(default = "default_build_timeout")]
pub build_timeout_sec: u64,
#[serde(default = "default_test_timeout")]
pub test_timeout_sec: u64,
#[serde(default = "default_bun_timeout")]
pub bun_timeout_sec: u64,
#[serde(default = "default_external_timeout_enabled")]
pub external_timeout_enabled: bool,
}
impl Default for CompilationConfig {
fn default() -> Self {
Self {
confidence_threshold: 0.85,
min_local_time_ms: 2000,
remote_speedup_threshold: default_speedup_threshold(),
build_slots: default_build_slots(),
test_slots: default_test_slots(),
check_slots: default_check_slots(),
build_timeout_sec: default_build_timeout(),
test_timeout_sec: default_test_timeout(),
bun_timeout_sec: default_bun_timeout(),
external_timeout_enabled: default_external_timeout_enabled(),
}
}
}
fn default_build_slots() -> u32 {
4
}
fn default_test_slots() -> u32 {
8
}
fn default_check_slots() -> u32 {
2
}
fn default_build_timeout() -> u64 {
300
}
fn default_test_timeout() -> u64 {
1800
}
fn default_bun_timeout() -> u64 {
600
}
fn default_external_timeout_enabled() -> bool {
true
}
impl CompilationConfig {
pub fn timeout_for_kind(&self, kind: Option<CompilationKind>) -> std::time::Duration {
let secs = match kind {
Some(CompilationKind::BunTest) | Some(CompilationKind::BunTypecheck) => {
self.bun_timeout_sec
}
Some(CompilationKind::CargoTest) | Some(CompilationKind::CargoNextest) => {
self.test_timeout_sec
}
_ => self.build_timeout_sec,
};
std::time::Duration::from_secs(secs)
}
pub fn external_timeout_enabled(&self) -> bool {
self.external_timeout_enabled
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferConfig {
#[serde(default = "default_compression")]
pub compression_level: u32,
#[serde(default = "default_excludes")]
pub exclude_patterns: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ssh_server_alive_interval_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ssh_control_persist_secs: Option<u64>,
#[serde(default = "default_remote_base")]
pub remote_base: String,
#[serde(default)]
pub retry: RetryConfig,
#[serde(default)]
pub verify_artifacts: bool,
#[serde(default = "default_verify_max_size")]
pub verify_max_size_bytes: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_transfer_mb: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_transfer_time_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bwlimit_kbps: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub estimated_bandwidth_bps: Option<u64>,
#[serde(default)]
pub adaptive_compression: bool,
#[serde(default = "default_min_compression")]
pub min_compression_level: u32,
#[serde(default = "default_max_compression")]
pub max_compression_level: u32,
}
impl Default for TransferConfig {
fn default() -> Self {
Self {
compression_level: 3,
exclude_patterns: default_excludes(),
ssh_server_alive_interval_secs: None,
ssh_control_persist_secs: None,
remote_base: default_remote_base(),
retry: RetryConfig::default(),
verify_artifacts: false,
verify_max_size_bytes: default_verify_max_size(),
max_transfer_mb: None,
max_transfer_time_ms: None,
bwlimit_kbps: None,
estimated_bandwidth_bps: None,
adaptive_compression: false,
min_compression_level: default_min_compression(),
max_compression_level: default_max_compression(),
}
}
}
impl TransferConfig {
pub fn select_compression_level(&self, estimated_bytes: Option<u64>) -> u32 {
if !self.adaptive_compression {
return self.compression_level;
}
let Some(bytes) = estimated_bytes else {
return self.compression_level;
};
const SMALL_THRESHOLD: u64 = 10_000_000; const LARGE_THRESHOLD: u64 = 200_000_000;
let level = if bytes < SMALL_THRESHOLD {
1 } else if bytes < LARGE_THRESHOLD {
3 } else {
7 };
level.clamp(self.min_compression_level, self.max_compression_level)
}
}
fn default_execution_allowlist() -> Vec<String> {
vec![
"cargo".to_string(),
"rustc".to_string(),
"nextest".to_string(),
"gcc".to_string(),
"g++".to_string(),
"clang".to_string(),
"clang++".to_string(),
"cc".to_string(),
"c++".to_string(),
"make".to_string(),
"cmake".to_string(),
"ninja".to_string(),
"meson".to_string(),
"bun".to_string(),
]
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionConfig {
#[serde(default = "default_execution_allowlist")]
pub allowlist: Vec<String>,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
allowlist: default_execution_allowlist(),
}
}
}
impl ExecutionConfig {
pub fn is_allowed(&self, command_base: &str) -> bool {
self.allowlist
.iter()
.any(|allowed| allowed.eq_ignore_ascii_case(command_base))
}
}
fn default_retry_max_attempts() -> u32 {
3
}
fn default_retry_base_delay_ms() -> u64 {
100
}
fn default_retry_max_delay_ms() -> u64 {
5000
}
fn default_retry_jitter_factor() -> f64 {
0.1
}
fn default_retry_total_timeout_ms() -> u64 {
30000
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
#[serde(default = "default_retry_max_attempts")]
pub max_attempts: u32,
#[serde(default = "default_retry_base_delay_ms")]
pub base_delay_ms: u64,
#[serde(default = "default_retry_max_delay_ms")]
pub max_delay_ms: u64,
#[serde(default = "default_retry_jitter_factor")]
pub jitter_factor: f64,
#[serde(default = "default_retry_total_timeout_ms")]
pub total_timeout_ms: u64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: default_retry_max_attempts(),
base_delay_ms: default_retry_base_delay_ms(),
max_delay_ms: default_retry_max_delay_ms(),
jitter_factor: default_retry_jitter_factor(),
total_timeout_ms: default_retry_total_timeout_ms(),
}
}
}
impl RetryConfig {
pub fn no_retry() -> Self {
Self {
max_attempts: 1,
..Default::default()
}
}
pub fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
if attempt == 0 {
return std::time::Duration::ZERO;
}
let base = self.base_delay_ms as f64;
let delay = base * (2.0_f64.powi(attempt as i32 - 1));
let capped = delay.min(self.max_delay_ms as f64);
let jittered = if self.jitter_factor > 0.0 {
let jitter_range = capped * self.jitter_factor;
let jitter = (rand::rng().random::<f64>() * 2.0 - 1.0) * jitter_range;
(capped + jitter).max(0.0)
} else {
capped
};
std::time::Duration::from_millis(jittered as u64)
}
pub fn should_retry(&self, attempt: u32, elapsed: std::time::Duration) -> bool {
if attempt >= self.max_attempts {
return false;
}
elapsed.as_millis() < self.total_timeout_ms as u128
}
}
pub fn default_remote_base() -> String {
"/tmp/rch".to_string()
}
pub fn validate_remote_base(path: &str) -> Result<String, String> {
let expanded = shellexpand::tilde(path).into_owned();
if !expanded.starts_with('/') {
return Err(format!(
"remote_base must be an absolute path, got: {}",
path
));
}
if expanded.contains("..") {
return Err(format!(
"remote_base must not contain path traversal (..): {}",
path
));
}
let normalized = expanded.trim_end_matches('/').to_string();
if normalized.is_empty() || normalized == "/" {
return Err("remote_base cannot be the root directory (safety restriction)".to_string());
}
let components: Vec<&str> = normalized.split('/').filter(|c| !c.is_empty()).collect();
if components.len() < 2 {
return Err(format!(
"remote_base must be at least 2 levels deep (e.g. /tmp/rch), got: {}",
normalized
));
}
Ok(normalized)
}
fn default_circuit_failure_threshold() -> u32 {
3
}
fn default_circuit_success_threshold() -> u32 {
2
}
fn default_circuit_error_rate_threshold() -> f64 {
0.5
}
fn default_circuit_window_secs() -> u64 {
60
}
fn default_circuit_open_cooldown_secs() -> u64 {
30
}
fn default_circuit_half_open_max_probes() -> u32 {
1
}
fn default_true() -> bool {
true
}
fn default_log_level() -> String {
"info".to_string()
}
pub fn default_socket_path() -> String {
if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR")
&& !runtime_dir.trim().is_empty()
{
let path = PathBuf::from(runtime_dir).join("rch.sock");
return path.to_string_lossy().to_string();
}
if let Some(cache_dir) = dirs::cache_dir() {
let rch_cache = cache_dir.join("rch");
let _ = std::fs::create_dir_all(&rch_cache);
return rch_cache.join("rch.sock").to_string_lossy().to_string();
}
"/tmp/rch.sock".to_string()
}
#[allow(dead_code)]
fn default_output_visibility() -> OutputVisibility {
OutputVisibility::None
}
fn default_confidence() -> f64 {
0.85
}
fn default_min_local_time() -> u64 {
2000
}
fn default_speedup_threshold() -> f64 {
1.2
}
fn default_compression() -> u32 {
3
}
fn default_verify_max_size() -> u64 {
100 * 1024 * 1024 }
fn default_min_compression() -> u32 {
1
}
fn default_max_compression() -> u32 {
9
}
fn default_excludes() -> Vec<String> {
vec![
"target/".to_string(),
"*.rlib".to_string(),
"*.rmeta".to_string(),
".git/objects/".to_string(),
"node_modules/".to_string(),
".bun/".to_string(),
".npm/".to_string(),
".pnpm-store/".to_string(),
"dist/".to_string(),
"build/".to_string(),
".next/".to_string(),
".nuxt/".to_string(),
".turbo/".to_string(),
".parcel-cache/".to_string(),
".beads/".to_string(),
"coverage/".to_string(),
".nyc_output/".to_string(),
".cargo/credentials".to_string(),
".cargo/credentials.toml".to_string(),
".env".to_string(),
".env.*".to_string(),
"*.pem".to_string(),
"*.key".to_string(),
"credentials.json".to_string(),
"secrets.json".to_string(),
"secrets.yaml".to_string(),
"secrets.yml".to_string(),
"secrets.toml".to_string(),
"secrets.env".to_string(),
"secrets.txt".to_string(),
"secrets.conf".to_string(),
"secrets.cfg".to_string(),
"secrets.properties".to_string(),
"secrets.xml".to_string(),
]
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum BuildLocation {
#[default]
Local,
Remote,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildCancellationWorkerHealth {
pub status: String,
pub speed_score: f64,
pub used_slots: u32,
pub available_slots: u32,
pub pressure_state: String,
pub pressure_reason_code: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildCancellationMetadata {
pub operation_id: String,
pub origin: String,
pub reason_code: String,
#[serde(default)]
pub decision_path: Vec<String>,
pub escalation_stage: String,
pub escalation_count: u32,
pub remote_kill_attempted: bool,
pub cleanup_ok: bool,
pub history_cancelled: bool,
pub final_state: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_health: Option<BuildCancellationWorkerHealth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildRecord {
pub id: u64,
pub started_at: String,
pub completed_at: String,
pub project_id: String,
pub worker_id: Option<String>,
pub command: String,
pub exit_code: i32,
pub duration_ms: u64,
pub location: BuildLocation,
pub bytes_transferred: Option<u64>,
#[serde(default)]
pub timing: Option<CommandTimingBreakdown>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cancellation: Option<BuildCancellationMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuildRecordInput {
pub started_at: String,
pub completed_at: String,
pub project_id: String,
pub worker_id: Option<String>,
pub command: String,
pub exit_code: i32,
pub duration_ms: u64,
pub location: BuildLocation,
pub bytes_transferred: Option<u64>,
#[serde(default)]
pub timing: Option<CommandTimingBreakdown>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cancellation: Option<BuildCancellationMetadata>,
}
impl BuildRecordInput {
pub fn into_record(self, id: u64) -> BuildRecord {
BuildRecord {
id,
started_at: self.started_at,
completed_at: self.completed_at,
project_id: self.project_id,
worker_id: self.worker_id,
command: self.command,
exit_code: self.exit_code,
duration_ms: self.duration_ms,
location: self.location,
bytes_transferred: self.bytes_transferred,
timing: self.timing,
cancellation: self.cancellation,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BuildStats {
pub total_builds: usize,
pub success_count: usize,
pub failure_count: usize,
pub remote_count: usize,
pub local_count: usize,
pub avg_duration_ms: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SavedTimeStats {
pub total_remote_duration_ms: u64,
pub estimated_local_duration_ms: u64,
pub time_saved_ms: u64,
pub builds_counted: usize,
pub avg_speedup: f64,
pub today_saved_ms: u64,
pub week_saved_ms: u64,
}
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CompilationTimingBreakdown {
#[serde(with = "duration_millis")]
pub rsync_up: Duration,
#[serde(with = "duration_millis")]
pub remote_build: Duration,
#[serde(with = "duration_millis")]
pub rsync_down: Duration,
#[serde(with = "duration_millis")]
pub total: Duration,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CommandTimingBreakdown {
#[serde(with = "option_duration_millis")]
pub classify: Option<Duration>,
#[serde(with = "option_duration_millis")]
pub select: Option<Duration>,
#[serde(with = "option_duration_millis")]
pub sync_up: Option<Duration>,
#[serde(with = "option_duration_millis")]
pub exec: Option<Duration>,
#[serde(with = "option_duration_millis")]
pub sync_down: Option<Duration>,
#[serde(with = "option_duration_millis")]
pub cleanup: Option<Duration>,
#[serde(with = "option_duration_millis")]
pub total: Option<Duration>,
}
mod duration_millis {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
duration.as_millis().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let millis = u64::deserialize(deserializer)?;
Ok(Duration::from_millis(millis))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompilationMetrics {
pub project_id: String,
pub worker_id: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub timing: CompilationTimingBreakdown,
#[serde(with = "option_duration_millis")]
pub local_build_time: Option<Duration>,
pub speedup: Option<f64>,
pub files_synced: u64,
pub bytes_transferred: u64,
pub exit_code: i32,
pub success: bool,
}
mod option_duration_millis {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match duration {
Some(d) => Some(d.as_millis() as u64).serialize(serializer),
None => Option::<u64>::None.serialize(serializer),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<u64> = Option::deserialize(deserializer)?;
Ok(opt.map(Duration::from_millis))
}
}
impl CompilationMetrics {
pub fn calculate_speedup(&mut self) {
if let Some(local) = self.local_build_time
&& self.timing.total.as_millis() > 0
{
self.speedup = Some(local.as_secs_f64() / self.timing.total.as_secs_f64());
}
}
pub fn is_beneficial(&self) -> bool {
self.speedup.map(|s| s > 1.0).unwrap_or(false)
}
}
impl Default for CompilationMetrics {
fn default() -> Self {
Self {
project_id: String::new(),
worker_id: String::new(),
timestamp: chrono::Utc::now(),
timing: CompilationTimingBreakdown::default(),
local_build_time: None,
speedup: None,
files_synced: 0,
bytes_transferred: 0,
exit_code: 0,
success: true,
}
}
}
#[derive(Debug)]
pub struct CompilationTimer {
project_id: String,
worker_id: String,
start: Instant,
phase_start: Instant,
rsync_up: Option<Duration>,
remote_build: Option<Duration>,
rsync_down: Option<Duration>,
}
impl CompilationTimer {
pub fn new(project_id: &str, worker_id: &str) -> Self {
let now = Instant::now();
Self {
project_id: project_id.to_string(),
worker_id: worker_id.to_string(),
start: now,
phase_start: now,
rsync_up: None,
remote_build: None,
rsync_down: None,
}
}
pub fn end_rsync_up(&mut self) {
self.rsync_up = Some(self.phase_start.elapsed());
self.phase_start = Instant::now();
tracing::info!(
rsync_up_ms = %self.rsync_up.unwrap().as_millis(),
"TIMING: rsync_up completed"
);
}
pub fn end_remote_build(&mut self) {
self.remote_build = Some(self.phase_start.elapsed());
self.phase_start = Instant::now();
tracing::info!(
remote_build_ms = %self.remote_build.unwrap().as_millis(),
"TIMING: remote_build completed"
);
}
pub fn end_rsync_down(&mut self) {
self.rsync_down = Some(self.phase_start.elapsed());
tracing::info!(
rsync_down_ms = %self.rsync_down.unwrap().as_millis(),
"TIMING: rsync_down completed"
);
}
pub fn finish(self, exit_code: i32, files: u64, bytes: u64) -> CompilationMetrics {
let total = self.start.elapsed();
tracing::info!(
total_ms = %total.as_millis(),
exit_code = %exit_code,
"TIMING: compilation completed"
);
CompilationMetrics {
project_id: self.project_id,
worker_id: self.worker_id,
timestamp: chrono::Utc::now(),
timing: CompilationTimingBreakdown {
rsync_up: self.rsync_up.unwrap_or_default(),
remote_build: self.remote_build.unwrap_or_default(),
rsync_down: self.rsync_down.unwrap_or_default(),
total,
},
local_build_time: None,
speedup: None,
files_synced: files,
bytes_transferred: bytes,
exit_code,
success: exit_code == 0,
}
}
}
#[derive(Debug)]
pub struct MetricsAggregator {
history: VecDeque<CompilationMetrics>,
max_history: usize,
}
impl MetricsAggregator {
pub fn new(max_history: usize) -> Self {
Self {
history: VecDeque::with_capacity(max_history),
max_history,
}
}
pub fn record(&mut self, metrics: CompilationMetrics) {
if self.history.len() >= self.max_history {
self.history.pop_front();
}
self.history.push_back(metrics);
}
pub fn average_speedup(&self) -> Option<f64> {
let speedups: Vec<f64> = self.history.iter().filter_map(|m| m.speedup).collect();
if speedups.is_empty() {
None
} else {
Some(speedups.iter().sum::<f64>() / speedups.len() as f64)
}
}
pub fn p50_total_time(&self) -> Option<Duration> {
self.percentile_total_time(0.50)
}
pub fn p95_total_time(&self) -> Option<Duration> {
self.percentile_total_time(0.95)
}
pub fn p99_total_time(&self) -> Option<Duration> {
self.percentile_total_time(0.99)
}
fn percentile_total_time(&self, percentile: f64) -> Option<Duration> {
let mut times: Vec<_> = self.history.iter().map(|m| m.timing.total).collect();
if times.is_empty() {
return None;
}
times.sort();
let idx = (((times.len() - 1) as f64 * percentile).round() as usize).min(times.len() - 1);
Some(times[idx])
}
pub fn success_rate(&self) -> f64 {
if self.history.is_empty() {
return 100.0;
}
let successes = self.history.iter().filter(|m| m.success).count();
(successes as f64 / self.history.len() as f64) * 100.0
}
pub fn count(&self) -> usize {
self.history.len()
}
pub fn metrics(&self) -> &VecDeque<CompilationMetrics> {
&self.history
}
pub fn clear(&mut self) {
self.history.clear();
}
}
impl Default for MetricsAggregator {
fn default() -> Self {
Self::new(1000)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_guard;
#[test]
fn test_circuit_state_default() {
let _guard = test_guard!();
assert_eq!(CircuitState::default(), CircuitState::Closed);
}
#[test]
fn test_circuit_config_defaults() {
let _guard = test_guard!();
let config = CircuitBreakerConfig::default();
assert_eq!(config.failure_threshold, 3);
assert_eq!(config.success_threshold, 2);
assert_eq!(config.error_rate_threshold, 0.5);
assert_eq!(config.window_secs, 60);
assert_eq!(config.open_cooldown_secs, 30);
assert_eq!(config.half_open_max_probes, 1);
}
#[test]
fn test_rch_config_has_circuit_defaults() {
let _guard = test_guard!();
let config = RchConfig::default();
assert_eq!(config.circuit.failure_threshold, 3);
}
#[test]
fn test_circuit_config_serde_roundtrip() {
let _guard = test_guard!();
let config = CircuitBreakerConfig {
failure_threshold: 5,
success_threshold: 3,
error_rate_threshold: 0.75,
window_secs: 120,
open_cooldown_secs: 45,
half_open_max_probes: 2,
};
let json = serde_json::to_string(&config).unwrap();
let parsed: CircuitBreakerConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.failure_threshold, 5);
assert_eq!(parsed.success_threshold, 3);
assert_eq!(parsed.error_rate_threshold, 0.75);
assert_eq!(parsed.window_secs, 120);
assert_eq!(parsed.open_cooldown_secs, 45);
assert_eq!(parsed.half_open_max_probes, 2);
}
#[test]
fn test_worker_capabilities_default() {
let _guard = test_guard!();
let caps = WorkerCapabilities::default();
assert!(caps.rustc_version.is_none());
assert!(caps.bun_version.is_none());
assert!(caps.node_version.is_none());
assert!(caps.npm_version.is_none());
assert!(caps.projects_root_ok.is_none());
assert!(caps.projects_root_issue.is_none());
assert!(caps.projects_root_checked_at_unix_ms.is_none());
}
#[test]
fn test_worker_capabilities_new() {
let _guard = test_guard!();
let caps = WorkerCapabilities::new();
assert!(caps.rustc_version.is_none());
assert!(!caps.has_rust());
assert!(!caps.has_bun());
assert!(!caps.has_node());
assert!(caps.is_topology_healthy().is_none());
}
#[test]
fn test_worker_capabilities_has_rust() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
assert!(!caps.has_rust());
caps.rustc_version = Some("rustc 1.76.0 (07dca489a 2024-02-04)".to_string());
assert!(caps.has_rust());
}
#[test]
fn test_worker_capabilities_has_bun() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
assert!(!caps.has_bun());
caps.bun_version = Some("1.0.25".to_string());
assert!(caps.has_bun());
}
#[test]
fn test_worker_capabilities_has_node() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
assert!(!caps.has_node());
caps.node_version = Some("v20.11.0".to_string());
assert!(caps.has_node());
}
#[test]
fn test_worker_capabilities_multiple_runtimes() {
let _guard = test_guard!();
let caps = WorkerCapabilities {
rustc_version: Some("rustc 1.76.0".to_string()),
bun_version: Some("1.0.25".to_string()),
node_version: Some("v20.11.0".to_string()),
npm_version: Some("10.2.4".to_string()),
..Default::default()
};
assert!(caps.has_rust());
assert!(caps.has_bun());
assert!(caps.has_node());
}
#[test]
fn test_worker_capabilities_serialization_empty() {
let _guard = test_guard!();
let caps = WorkerCapabilities::new();
let json = serde_json::to_string(&caps).unwrap();
assert_eq!(json, "{}");
let parsed: WorkerCapabilities = serde_json::from_str(&json).unwrap();
assert!(!parsed.has_rust());
assert!(!parsed.has_bun());
assert!(!parsed.has_node());
}
#[test]
fn test_worker_capabilities_serialization_with_versions() {
let _guard = test_guard!();
let caps = WorkerCapabilities {
rustc_version: Some("rustc 1.76.0-nightly".to_string()),
bun_version: Some("1.0.25".to_string()),
node_version: None,
npm_version: None,
..Default::default()
};
let json = serde_json::to_string(&caps).unwrap();
assert!(json.contains("rustc_version"));
assert!(json.contains("rustc 1.76.0-nightly"));
assert!(json.contains("bun_version"));
assert!(json.contains("1.0.25"));
assert!(!json.contains("node_version"));
assert!(!json.contains("npm_version"));
let parsed: WorkerCapabilities = serde_json::from_str(&json).unwrap();
assert!(parsed.has_rust());
assert!(parsed.has_bun());
assert!(!parsed.has_node());
}
#[test]
fn test_worker_capabilities_deserialization_partial() {
let _guard = test_guard!();
let json = r#"{"bun_version": "1.0.0"}"#;
let caps: WorkerCapabilities = serde_json::from_str(json).unwrap();
assert!(!caps.has_rust());
assert!(caps.has_bun());
assert!(!caps.has_node());
assert_eq!(caps.bun_version, Some("1.0.0".to_string()));
}
#[test]
fn test_worker_capabilities_clone() {
let _guard = test_guard!();
let caps = WorkerCapabilities {
rustc_version: Some("1.76.0".to_string()),
bun_version: None,
node_version: Some("v20".to_string()),
npm_version: None,
projects_root_ok: Some(false),
projects_root_issue: Some("alias_missing".to_string()),
projects_root_checked_at_unix_ms: Some(123),
..Default::default()
};
let cloned = caps.clone();
assert_eq!(cloned.rustc_version, caps.rustc_version);
assert_eq!(cloned.bun_version, caps.bun_version);
assert_eq!(cloned.node_version, caps.node_version);
assert_eq!(cloned.npm_version, caps.npm_version);
assert_eq!(cloned.projects_root_ok, caps.projects_root_ok);
assert_eq!(cloned.projects_root_issue, caps.projects_root_issue);
assert_eq!(
cloned.projects_root_checked_at_unix_ms,
caps.projects_root_checked_at_unix_ms
);
}
#[test]
fn test_worker_capabilities_topology_health_helper() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
assert_eq!(caps.is_topology_healthy(), None);
caps.projects_root_ok = Some(true);
assert_eq!(caps.is_topology_healthy(), Some(true));
caps.projects_root_ok = Some(false);
assert_eq!(caps.is_topology_healthy(), Some(false));
}
#[test]
fn test_selection_reason_serialization() {
let _guard = test_guard!();
assert_eq!(
serde_json::to_string(&SelectionReason::Success).unwrap(),
"\"success\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::NoWorkersConfigured).unwrap(),
"\"no_workers_configured\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::AllWorkersUnreachable).unwrap(),
"\"all_workers_unreachable\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::AllCircuitsOpen).unwrap(),
"\"all_circuits_open\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::AllWorkersBusy).unwrap(),
"\"all_workers_busy\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::NoWorkersPassedHealth).unwrap(),
"\"no_workers_passed_health\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::AllWorkersFailedPreflight).unwrap(),
"\"all_workers_failed_preflight\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::AllWorkersFailedConvergence).unwrap(),
"\"all_workers_failed_convergence\""
);
assert_eq!(
serde_json::to_string(&SelectionReason::NoAdmissibleWorkers(
"critical_pressure=1,insufficient_slots=1".to_string()
))
.unwrap(),
"{\"no_admissible_workers\":\"critical_pressure=1,insufficient_slots=1\"}"
);
assert_eq!(
serde_json::to_string(&SelectionReason::NoMatchingWorkers).unwrap(),
"\"no_matching_workers\""
);
}
#[test]
fn test_selection_reason_with_error() {
let _guard = test_guard!();
let reason = SelectionReason::SelectionError("test error".to_string());
let json = serde_json::to_string(&reason).unwrap();
assert!(json.contains("selection_error"));
assert!(json.contains("test error"));
}
#[test]
fn test_selection_reason_deserialization() {
let _guard = test_guard!();
assert_eq!(
serde_json::from_str::<SelectionReason>("\"success\"").unwrap(),
SelectionReason::Success
);
assert_eq!(
serde_json::from_str::<SelectionReason>("\"all_workers_busy\"").unwrap(),
SelectionReason::AllWorkersBusy
);
assert_eq!(
serde_json::from_str::<SelectionReason>("\"no_workers_passed_health\"").unwrap(),
SelectionReason::NoWorkersPassedHealth
);
assert_eq!(
serde_json::from_str::<SelectionReason>("\"all_workers_failed_preflight\"").unwrap(),
SelectionReason::AllWorkersFailedPreflight
);
assert_eq!(
serde_json::from_str::<SelectionReason>("\"all_workers_failed_convergence\"").unwrap(),
SelectionReason::AllWorkersFailedConvergence
);
assert_eq!(
serde_json::from_str::<SelectionReason>(
"{\"no_admissible_workers\":\"critical_pressure=1\"}"
)
.unwrap(),
SelectionReason::NoAdmissibleWorkers("critical_pressure=1".to_string())
);
}
#[test]
fn test_selection_reason_display() {
let _guard = test_guard!();
assert_eq!(
SelectionReason::Success.to_string(),
"worker assigned successfully"
);
assert_eq!(
SelectionReason::NoWorkersConfigured.to_string(),
"no workers configured"
);
assert_eq!(
SelectionReason::AllWorkersUnreachable.to_string(),
"all workers unreachable"
);
assert_eq!(
SelectionReason::AllWorkersBusy.to_string(),
"all workers at capacity"
);
assert_eq!(
SelectionReason::NoWorkersPassedHealth.to_string(),
"no workers passed health thresholds"
);
assert_eq!(
SelectionReason::AllWorkersFailedPreflight.to_string(),
"all workers failed preflight checks"
);
assert_eq!(
SelectionReason::AllWorkersFailedConvergence.to_string(),
"all workers failed repo convergence checks"
);
assert_eq!(
SelectionReason::NoAdmissibleWorkers(
"critical_pressure=1,insufficient_slots=1".to_string()
)
.to_string(),
"no admissible workers: critical_pressure=1,insufficient_slots=1"
);
assert_eq!(
SelectionReason::SelectionError("oops".to_string()).to_string(),
"selection error: oops"
);
}
#[test]
fn test_selection_response_with_worker() {
let _guard = test_guard!();
let response = SelectionResponse {
worker: Some(SelectedWorker {
id: WorkerId::new("test"),
host: "localhost".to_string(),
user: "user".to_string(),
identity_file: "~/.ssh/id_rsa".to_string(),
slots_available: 8,
speed_score: 75.0,
}),
reason: SelectionReason::Success,
build_id: None,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("\"reason\":\"success\""));
assert!(json.contains("\"id\":\"test\""));
}
#[test]
fn test_selection_response_without_worker() {
let _guard = test_guard!();
let response = SelectionResponse {
worker: None,
reason: SelectionReason::AllWorkersBusy,
build_id: None,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("\"worker\":null"));
assert!(json.contains("\"reason\":\"all_workers_busy\""));
}
#[test]
fn test_selection_response_roundtrip() {
let _guard = test_guard!();
let original = SelectionResponse {
worker: Some(SelectedWorker {
id: WorkerId::new("worker-1"),
host: "192.168.1.100".to_string(),
user: "ubuntu".to_string(),
identity_file: "/path/to/key".to_string(),
slots_available: 16,
speed_score: 90.5,
}),
reason: SelectionReason::Success,
build_id: None,
};
let json = serde_json::to_string(&original).unwrap();
let parsed: SelectionResponse = serde_json::from_str(&json).unwrap();
assert!(parsed.worker.is_some());
let worker = parsed.worker.unwrap();
assert_eq!(worker.id.as_str(), "worker-1");
assert_eq!(worker.host, "192.168.1.100");
assert_eq!(worker.slots_available, 16);
assert_eq!(parsed.reason, SelectionReason::Success);
}
#[test]
fn test_build_heartbeat_phase_roundtrip() {
let _guard = test_guard!();
let json = serde_json::to_string(&BuildHeartbeatPhase::SyncUp).unwrap();
assert_eq!(json, "\"sync_up\"");
let parsed: BuildHeartbeatPhase = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, BuildHeartbeatPhase::SyncUp);
}
#[test]
fn test_build_heartbeat_request_roundtrip() {
let _guard = test_guard!();
let request = BuildHeartbeatRequest {
build_id: 42,
worker_id: WorkerId::new("worker-a"),
hook_pid: Some(12345),
remote_pgid_file: Some("/tmp/rch/project/hash/.rch-run/42.pgid".to_string()),
phase: BuildHeartbeatPhase::Execute,
detail: Some("Compiling crates".to_string()),
progress_counter: Some(7),
progress_percent: Some(42.5),
};
let json = serde_json::to_string(&request).unwrap();
let parsed: BuildHeartbeatRequest = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.build_id, 42);
assert_eq!(parsed.worker_id.as_str(), "worker-a");
assert_eq!(
parsed.remote_pgid_file,
Some("/tmp/rch/project/hash/.rch-run/42.pgid".to_string())
);
assert_eq!(parsed.phase, BuildHeartbeatPhase::Execute);
assert_eq!(parsed.progress_counter, Some(7));
assert_eq!(parsed.progress_percent, Some(42.5));
}
#[test]
fn test_circuit_stats_new() {
let _guard = test_guard!();
let stats = CircuitStats::new();
assert_eq!(stats.state(), CircuitState::Closed);
assert_eq!(stats.consecutive_failures(), 0);
assert_eq!(stats.error_rate(), 0.0);
assert!(stats.opened_at().is_none());
}
#[test]
fn test_circuit_stats_record_success() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.record_success();
stats.record_success();
assert_eq!(stats.error_rate(), 0.0);
assert_eq!(stats.consecutive_failures(), 0);
}
#[test]
fn test_circuit_stats_record_failure() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.record_failure();
stats.record_failure();
assert_eq!(stats.consecutive_failures(), 2);
assert_eq!(stats.error_rate(), 1.0); }
#[test]
fn test_circuit_stats_error_rate() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.record_success();
stats.record_success();
stats.record_failure();
assert!((stats.error_rate() - 0.333).abs() < 0.01);
}
#[test]
fn test_circuit_stats_should_open_consecutive_failures() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
let config = CircuitBreakerConfig::default();
stats.record_failure();
assert!(!stats.should_open(&config));
stats.record_failure();
assert!(!stats.should_open(&config));
stats.record_failure();
assert!(stats.should_open(&config)); }
#[test]
fn test_circuit_stats_should_open_error_rate() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
let config = CircuitBreakerConfig {
error_rate_threshold: 0.5,
..Default::default()
};
stats.record_success();
stats.record_success();
stats.record_failure();
stats.record_failure();
assert!(!stats.should_open(&config));
stats.record_failure(); assert!(stats.should_open(&config));
}
#[test]
fn test_circuit_stats_success_resets_consecutive_failures() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.record_failure();
stats.record_failure();
assert_eq!(stats.consecutive_failures(), 2);
stats.record_success();
assert_eq!(stats.consecutive_failures(), 0);
}
#[test]
fn test_circuit_stats_open_transition() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.open();
assert_eq!(stats.state(), CircuitState::Open);
assert!(stats.opened_at().is_some());
}
#[test]
fn test_circuit_stats_half_open_transition() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.open();
stats.half_open();
assert_eq!(stats.state(), CircuitState::HalfOpen);
}
#[test]
fn test_circuit_stats_close_transition() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.open();
stats.half_open();
stats.close();
assert_eq!(stats.state(), CircuitState::Closed);
assert!(stats.opened_at().is_none());
assert_eq!(stats.consecutive_failures(), 0);
}
#[test]
fn test_circuit_stats_should_close() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
let config = CircuitBreakerConfig {
success_threshold: 2,
..Default::default()
};
stats.open();
stats.half_open();
assert!(!stats.should_close(&config));
stats.record_success();
assert!(!stats.should_close(&config));
stats.record_success();
assert!(stats.should_close(&config)); }
#[test]
fn test_circuit_stats_can_probe() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
let config = CircuitBreakerConfig {
half_open_max_probes: 1,
..Default::default()
};
assert!(!stats.can_probe(&config));
stats.open();
assert!(!stats.can_probe(&config));
stats.half_open();
assert!(stats.can_probe(&config));
assert!(stats.start_probe(&config));
assert!(!stats.can_probe(&config));
assert!(!stats.start_probe(&config));
}
#[test]
fn test_circuit_stats_probe_completion() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
let config = CircuitBreakerConfig {
half_open_max_probes: 1,
..Default::default()
};
stats.open();
stats.half_open();
stats.start_probe(&config);
stats.record_success();
assert!(stats.can_probe(&config));
}
#[test]
fn test_circuit_stats_failure_in_half_open() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
let config = CircuitBreakerConfig {
success_threshold: 2,
..Default::default()
};
stats.open();
stats.half_open();
stats.record_success();
assert_eq!(stats.consecutive_failures(), 0);
stats.record_failure();
assert!(!stats.should_close(&config));
stats.record_success();
stats.record_success();
assert!(stats.should_close(&config));
}
#[test]
fn test_circuit_stats_reset_window() {
let _guard = test_guard!();
let mut stats = CircuitStats::new();
stats.record_success();
stats.record_failure();
assert!(stats.error_rate() > 0.0);
stats.reset_window();
assert_eq!(stats.error_rate(), 0.0);
}
#[test]
fn test_circuit_state_transitions_are_deterministic() {
let _guard = test_guard!();
let config = CircuitBreakerConfig::default();
let mut stats = CircuitStats::new();
assert_eq!(stats.state(), CircuitState::Closed);
for _ in 0..3 {
stats.record_failure();
}
assert!(stats.should_open(&config));
stats.open();
assert_eq!(stats.state(), CircuitState::Open);
stats.half_open();
assert_eq!(stats.state(), CircuitState::HalfOpen);
for _ in 0..2 {
stats.record_success();
}
assert!(stats.should_close(&config));
stats.close();
assert_eq!(stats.state(), CircuitState::Closed);
}
fn make_test_metrics(
speedup: Option<f64>,
total_secs: u64,
success: bool,
) -> CompilationMetrics {
CompilationMetrics {
project_id: "test-project".to_string(),
worker_id: "worker-1".to_string(),
timestamp: chrono::Utc::now(),
timing: CompilationTimingBreakdown {
rsync_up: Duration::from_millis(100),
remote_build: Duration::from_millis(800),
rsync_down: Duration::from_millis(100),
total: Duration::from_secs(total_secs),
},
local_build_time: speedup.map(|s| Duration::from_secs_f64(total_secs as f64 * s)),
speedup,
files_synced: 100,
bytes_transferred: 1_000_000,
exit_code: if success { 0 } else { 1 },
success,
}
}
#[test]
fn test_compilation_timing_breakdown_default() {
let _guard = test_guard!();
let timing = CompilationTimingBreakdown::default();
assert_eq!(timing.rsync_up, Duration::ZERO);
assert_eq!(timing.remote_build, Duration::ZERO);
assert_eq!(timing.rsync_down, Duration::ZERO);
assert_eq!(timing.total, Duration::ZERO);
}
#[test]
fn test_compilation_timing_breakdown_serialization() {
let _guard = test_guard!();
let timing = CompilationTimingBreakdown {
rsync_up: Duration::from_millis(100),
remote_build: Duration::from_millis(2000),
rsync_down: Duration::from_millis(50),
total: Duration::from_millis(2150),
};
let json = serde_json::to_string(&timing).unwrap();
assert!(json.contains("100")); assert!(json.contains("2000")); assert!(json.contains("50")); assert!(json.contains("2150"));
let parsed: CompilationTimingBreakdown = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.rsync_up, timing.rsync_up);
assert_eq!(parsed.remote_build, timing.remote_build);
assert_eq!(parsed.rsync_down, timing.rsync_down);
assert_eq!(parsed.total, timing.total);
}
#[test]
fn test_command_timing_breakdown_serialization() {
let _guard = test_guard!();
let timing = CommandTimingBreakdown {
classify: Some(Duration::from_millis(2)),
select: None,
sync_up: Some(Duration::from_millis(120)),
exec: Some(Duration::from_millis(900)),
sync_down: None,
cleanup: Some(Duration::from_millis(5)),
total: Some(Duration::from_millis(1027)),
};
let json = serde_json::to_string(&timing).unwrap();
assert!(json.contains("\"classify\":2"));
assert!(json.contains("\"select\":null"));
assert!(json.contains("\"sync_up\":120"));
assert!(json.contains("\"exec\":900"));
assert!(json.contains("\"sync_down\":null"));
assert!(json.contains("\"cleanup\":5"));
assert!(json.contains("\"total\":1027"));
let parsed: CommandTimingBreakdown = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.classify, timing.classify);
assert_eq!(parsed.select, timing.select);
assert_eq!(parsed.sync_up, timing.sync_up);
assert_eq!(parsed.exec, timing.exec);
assert_eq!(parsed.sync_down, timing.sync_down);
assert_eq!(parsed.cleanup, timing.cleanup);
assert_eq!(parsed.total, timing.total);
}
#[test]
fn test_compilation_metrics_calculate_speedup() {
let _guard = test_guard!();
let mut metrics = CompilationMetrics {
timing: CompilationTimingBreakdown {
total: Duration::from_secs(10),
..Default::default()
},
local_build_time: Some(Duration::from_secs(30)),
..Default::default()
};
metrics.calculate_speedup();
assert!(metrics.speedup.is_some());
let speedup = metrics.speedup.unwrap();
assert!(
(speedup - 3.0).abs() < 0.01,
"Expected 3.0x speedup, got {}",
speedup
);
}
#[test]
fn test_compilation_metrics_is_beneficial() {
let _guard = test_guard!();
let mut metrics = CompilationMetrics::default();
assert!(!metrics.is_beneficial());
metrics.speedup = Some(1.5);
assert!(metrics.is_beneficial());
metrics.speedup = Some(0.8);
assert!(!metrics.is_beneficial());
metrics.speedup = Some(1.0);
assert!(!metrics.is_beneficial());
}
#[test]
fn test_compilation_metrics_serialization() {
let _guard = test_guard!();
let metrics = make_test_metrics(Some(2.5), 10, true);
let json = serde_json::to_string(&metrics).unwrap();
assert!(json.contains("test-project"));
assert!(json.contains("worker-1"));
assert!(json.contains("2.5"));
let parsed: CompilationMetrics = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.project_id, "test-project");
assert_eq!(parsed.worker_id, "worker-1");
assert!(parsed.success);
}
#[test]
fn test_compilation_timer_new() {
let _guard = test_guard!();
let timer = CompilationTimer::new("my-project", "my-worker");
assert_eq!(timer.project_id, "my-project");
assert_eq!(timer.worker_id, "my-worker");
assert!(timer.rsync_up.is_none());
assert!(timer.remote_build.is_none());
assert!(timer.rsync_down.is_none());
}
#[test]
fn test_compilation_timer_phases() {
let _guard = test_guard!();
let mut timer = CompilationTimer::new("test", "worker");
std::thread::sleep(Duration::from_millis(5));
timer.end_rsync_up();
assert!(timer.rsync_up.is_some());
assert!(timer.rsync_up.unwrap() >= Duration::from_millis(5));
std::thread::sleep(Duration::from_millis(10));
timer.end_remote_build();
assert!(timer.remote_build.is_some());
assert!(timer.remote_build.unwrap() >= Duration::from_millis(10));
std::thread::sleep(Duration::from_millis(5));
timer.end_rsync_down();
assert!(timer.rsync_down.is_some());
assert!(timer.rsync_down.unwrap() >= Duration::from_millis(5));
let metrics = timer.finish(0, 50, 500_000);
assert_eq!(metrics.project_id, "test");
assert_eq!(metrics.worker_id, "worker");
assert!(metrics.success);
assert!(metrics.timing.total >= Duration::from_millis(20));
assert_eq!(metrics.files_synced, 50);
assert_eq!(metrics.bytes_transferred, 500_000);
}
#[test]
fn test_metrics_aggregator_new() {
let _guard = test_guard!();
let agg = MetricsAggregator::new(100);
assert_eq!(agg.count(), 0);
assert!(agg.average_speedup().is_none());
}
#[test]
fn test_metrics_aggregator_record() {
let _guard = test_guard!();
let mut agg = MetricsAggregator::new(3);
agg.record(make_test_metrics(Some(2.0), 10, true));
assert_eq!(agg.count(), 1);
agg.record(make_test_metrics(Some(3.0), 20, true));
agg.record(make_test_metrics(Some(4.0), 30, true));
assert_eq!(agg.count(), 3);
agg.record(make_test_metrics(Some(5.0), 40, true));
assert_eq!(agg.count(), 3);
}
#[test]
fn test_metrics_aggregator_average_speedup() {
let _guard = test_guard!();
let mut agg = MetricsAggregator::new(100);
assert!(agg.average_speedup().is_none());
for i in 1..=5 {
agg.record(make_test_metrics(Some(i as f64), 10, true));
}
let avg = agg.average_speedup().unwrap();
assert!((avg - 3.0).abs() < 0.01, "Expected 3.0, got {}", avg);
}
#[test]
fn test_metrics_aggregator_average_speedup_with_none() {
let _guard = test_guard!();
let mut agg = MetricsAggregator::new(100);
agg.record(make_test_metrics(Some(2.0), 10, true));
agg.record(make_test_metrics(None, 10, true)); agg.record(make_test_metrics(Some(4.0), 10, true));
let avg = agg.average_speedup().unwrap();
assert!((avg - 3.0).abs() < 0.01, "Expected 3.0, got {}", avg);
}
#[test]
fn test_metrics_aggregator_percentiles() {
let _guard = test_guard!();
let mut agg = MetricsAggregator::new(100);
for i in 1..=10 {
agg.record(make_test_metrics(Some(1.0), i, true));
}
let p50 = agg.p50_total_time().unwrap();
assert!(p50 >= Duration::from_secs(5) && p50 <= Duration::from_secs(6));
let p95 = agg.p95_total_time().unwrap();
assert!(p95 >= Duration::from_secs(9) && p95 <= Duration::from_secs(10));
let p99 = agg.p99_total_time().unwrap();
assert!(p99 >= Duration::from_secs(9));
}
#[test]
fn test_metrics_aggregator_success_rate() {
let _guard = test_guard!();
let mut agg = MetricsAggregator::new(100);
assert_eq!(agg.success_rate(), 100.0);
agg.record(make_test_metrics(Some(1.0), 10, true));
agg.record(make_test_metrics(Some(1.0), 10, true));
assert_eq!(agg.success_rate(), 100.0);
agg.record(make_test_metrics(Some(1.0), 10, false));
let rate = agg.success_rate();
assert!((rate - 66.67).abs() < 1.0, "Expected ~66.67%, got {}", rate);
}
#[test]
fn test_metrics_aggregator_clear() {
let _guard = test_guard!();
let mut agg = MetricsAggregator::new(100);
agg.record(make_test_metrics(Some(1.0), 10, true));
agg.record(make_test_metrics(Some(2.0), 20, true));
assert_eq!(agg.count(), 2);
agg.clear();
assert_eq!(agg.count(), 0);
assert!(agg.average_speedup().is_none());
}
#[test]
fn test_compilation_config_default_timeouts() {
let _guard = test_guard!();
let config = CompilationConfig::default();
assert_eq!(config.build_timeout_sec, 300);
assert_eq!(config.test_timeout_sec, 1800);
}
#[test]
fn test_compilation_config_timeout_for_test_kinds() {
let _guard = test_guard!();
let config = CompilationConfig::default();
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoTest)),
std::time::Duration::from_secs(1800)
);
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoNextest)),
std::time::Duration::from_secs(1800)
);
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::BunTest)),
std::time::Duration::from_secs(config.bun_timeout_sec)
);
}
#[test]
fn test_compilation_config_timeout_for_build_kinds() {
let _guard = test_guard!();
let config = CompilationConfig::default();
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoBuild)),
std::time::Duration::from_secs(300)
);
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoCheck)),
std::time::Duration::from_secs(300)
);
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoClippy)),
std::time::Duration::from_secs(300)
);
assert_eq!(
config.timeout_for_kind(None),
std::time::Duration::from_secs(300)
);
}
#[test]
fn test_compilation_config_custom_timeouts() {
let _guard = test_guard!();
let config = CompilationConfig {
build_timeout_sec: 600, test_timeout_sec: 3600, ..Default::default()
};
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoBuild)),
std::time::Duration::from_secs(600)
);
assert_eq!(
config.timeout_for_kind(Some(crate::CompilationKind::CargoTest)),
std::time::Duration::from_secs(3600)
);
}
#[test]
fn test_compilation_config_speedup_threshold_default() {
let _guard = test_guard!();
let config = CompilationConfig::default();
assert!((config.remote_speedup_threshold - 1.2).abs() < 0.001);
}
#[test]
fn test_compilation_config_speedup_threshold_custom() {
let _guard = test_guard!();
let config = CompilationConfig {
remote_speedup_threshold: 1.5, ..Default::default()
};
assert!((config.remote_speedup_threshold - 1.5).abs() < 0.001);
}
#[test]
fn test_compilation_config_speedup_threshold_no_minimum() {
let _guard = test_guard!();
let config = CompilationConfig {
remote_speedup_threshold: 1.0,
..Default::default()
};
assert!((config.remote_speedup_threshold - 1.0).abs() < 0.001);
}
#[test]
fn test_compilation_config_min_local_time_ms_default() {
let _guard = test_guard!();
let config = CompilationConfig::default();
assert_eq!(config.min_local_time_ms, 2000);
}
#[test]
fn test_compilation_config_serde_roundtrip_speedup_threshold() {
let _guard = test_guard!();
let config = CompilationConfig {
remote_speedup_threshold: 2.5,
min_local_time_ms: 5000,
..Default::default()
};
let json = serde_json::to_string(&config).unwrap();
let parsed: CompilationConfig = serde_json::from_str(&json).unwrap();
assert!((parsed.remote_speedup_threshold - 2.5).abs() < 0.001);
assert_eq!(parsed.min_local_time_ms, 5000);
}
#[test]
fn test_validate_remote_base_absolute_path() {
let _guard = test_guard!();
assert_eq!(validate_remote_base("/tmp/rch").unwrap(), "/tmp/rch");
assert_eq!(
validate_remote_base("/var/rch-builds").unwrap(),
"/var/rch-builds"
);
assert_eq!(
validate_remote_base("/home/builder/.rch").unwrap(),
"/home/builder/.rch"
);
}
#[test]
fn test_validate_remote_base_tilde_expansion() {
let _guard = test_guard!();
let result = validate_remote_base("~/rch");
assert!(result.is_ok());
let path = result.unwrap();
assert!(
path.starts_with('/'),
"Path should be absolute after expansion: {}",
path
);
assert!(!path.contains('~'), "Tilde should be expanded: {}", path);
}
#[test]
fn test_validate_remote_base_rejects_relative_path() {
let _guard = test_guard!();
let result = validate_remote_base("tmp/rch");
assert!(result.is_err());
assert!(result.unwrap_err().contains("absolute path"));
}
#[test]
fn test_validate_remote_base_rejects_path_traversal() {
let _guard = test_guard!();
let result = validate_remote_base("/tmp/../etc/rch");
assert!(result.is_err());
assert!(result.unwrap_err().contains("path traversal"));
let result = validate_remote_base("/tmp/rch/../other");
assert!(result.is_err());
assert!(result.unwrap_err().contains("path traversal"));
}
#[test]
fn test_validate_remote_base_normalizes_trailing_slash() {
let _guard = test_guard!();
assert_eq!(validate_remote_base("/tmp/rch/").unwrap(), "/tmp/rch");
assert_eq!(validate_remote_base("/tmp/rch///").unwrap(), "/tmp/rch");
}
#[test]
fn test_validate_remote_base_root_path() {
let _guard = test_guard!();
let result = validate_remote_base("/");
assert!(result.is_err());
assert!(result.unwrap_err().contains("root directory"));
}
#[test]
fn test_validate_remote_base_rejects_top_level() {
let _guard = test_guard!();
let result = validate_remote_base("/tmp");
assert!(result.is_err());
assert!(result.unwrap_err().contains("at least 2 levels deep"));
let result = validate_remote_base("/home");
assert!(result.is_err());
assert_eq!(validate_remote_base("/tmp/rch").unwrap(), "/tmp/rch");
}
#[test]
fn test_default_remote_base() {
let _guard = test_guard!();
assert_eq!(default_remote_base(), "/tmp/rch");
}
#[test]
fn test_transfer_config_default_has_remote_base() {
let _guard = test_guard!();
let config = TransferConfig::default();
assert_eq!(config.remote_base, "/tmp/rch");
}
#[test]
fn test_retry_config_default() {
let _guard = test_guard!();
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 3);
assert_eq!(config.base_delay_ms, 100);
assert_eq!(config.max_delay_ms, 5000);
assert_eq!(config.jitter_factor, 0.1);
assert_eq!(config.total_timeout_ms, 30000);
}
#[test]
fn test_retry_config_no_retry() {
let _guard = test_guard!();
let config = RetryConfig::no_retry();
assert_eq!(config.max_attempts, 1);
}
#[test]
fn test_retry_config_delay_for_attempt_zero() {
let _guard = test_guard!();
let config = RetryConfig::default();
assert_eq!(config.delay_for_attempt(0), std::time::Duration::ZERO);
}
#[test]
fn test_retry_config_delay_for_attempt_exponential() {
let _guard = test_guard!();
let config = RetryConfig {
base_delay_ms: 100,
max_delay_ms: 10000,
jitter_factor: 0.0, ..Default::default()
};
assert_eq!(
config.delay_for_attempt(1),
std::time::Duration::from_millis(100)
);
assert_eq!(
config.delay_for_attempt(2),
std::time::Duration::from_millis(200)
);
assert_eq!(
config.delay_for_attempt(3),
std::time::Duration::from_millis(400)
);
}
#[test]
fn test_retry_config_delay_capped_at_max() {
let _guard = test_guard!();
let config = RetryConfig {
base_delay_ms: 1000,
max_delay_ms: 2000,
jitter_factor: 0.0,
..Default::default()
};
assert_eq!(
config.delay_for_attempt(2),
std::time::Duration::from_millis(2000)
);
assert_eq!(
config.delay_for_attempt(3),
std::time::Duration::from_millis(2000)
);
}
#[test]
fn test_retry_config_should_retry() {
let _guard = test_guard!();
let config = RetryConfig {
max_attempts: 3,
total_timeout_ms: 1000,
..Default::default()
};
assert!(config.should_retry(0, std::time::Duration::from_millis(0)));
assert!(config.should_retry(1, std::time::Duration::from_millis(500)));
assert!(config.should_retry(2, std::time::Duration::from_millis(900)));
assert!(!config.should_retry(3, std::time::Duration::from_millis(0)));
assert!(!config.should_retry(1, std::time::Duration::from_millis(1001)));
}
#[test]
fn test_retry_config_serde_roundtrip() {
let _guard = test_guard!();
let config = RetryConfig {
max_attempts: 5,
base_delay_ms: 200,
max_delay_ms: 8000,
jitter_factor: 0.2,
total_timeout_ms: 60000,
};
let json = serde_json::to_string(&config).unwrap();
let parsed: RetryConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.max_attempts, 5);
assert_eq!(parsed.base_delay_ms, 200);
assert_eq!(parsed.max_delay_ms, 8000);
assert_eq!(parsed.jitter_factor, 0.2);
assert_eq!(parsed.total_timeout_ms, 60000);
}
#[test]
fn test_transfer_config_includes_retry() {
let _guard = test_guard!();
let config = TransferConfig::default();
assert_eq!(config.retry.max_attempts, 3);
}
#[test]
fn test_execution_config_default_allowlist() {
let _guard = test_guard!();
let config = ExecutionConfig::default();
assert!(config.is_allowed("cargo"));
assert!(config.is_allowed("rustc"));
assert!(config.is_allowed("gcc"));
assert!(config.is_allowed("g++"));
assert!(config.is_allowed("clang"));
assert!(config.is_allowed("clang++"));
assert!(config.is_allowed("make"));
assert!(config.is_allowed("cmake"));
assert!(config.is_allowed("ninja"));
assert!(config.is_allowed("meson"));
assert!(config.is_allowed("bun"));
assert!(config.is_allowed("nextest"));
assert!(config.is_allowed("cc"));
assert!(config.is_allowed("c++"));
}
#[test]
fn test_execution_config_is_allowed_case_insensitive() {
let _guard = test_guard!();
let config = ExecutionConfig::default();
assert!(config.is_allowed("CARGO"));
assert!(config.is_allowed("Cargo"));
assert!(config.is_allowed("GCC"));
assert!(config.is_allowed("Gcc"));
}
#[test]
fn test_execution_config_is_not_allowed() {
let _guard = test_guard!();
let config = ExecutionConfig::default();
assert!(!config.is_allowed("python"));
assert!(!config.is_allowed("npm"));
assert!(!config.is_allowed("go"));
assert!(!config.is_allowed(""));
}
#[test]
fn test_execution_config_empty_allowlist() {
let _guard = test_guard!();
let config = ExecutionConfig { allowlist: vec![] };
assert!(!config.is_allowed("cargo"));
assert!(!config.is_allowed("gcc"));
}
#[test]
fn test_execution_config_custom_allowlist() {
let _guard = test_guard!();
let config = ExecutionConfig {
allowlist: vec!["cargo".to_string(), "custom_tool".to_string()],
};
assert!(config.is_allowed("cargo"));
assert!(config.is_allowed("custom_tool"));
assert!(!config.is_allowed("gcc"));
}
#[test]
fn test_execution_config_serde() {
let _guard = test_guard!();
let config = ExecutionConfig {
allowlist: vec!["cargo".to_string(), "rustc".to_string()],
};
let json = serde_json::to_string(&config).unwrap();
let parsed: ExecutionConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.allowlist, config.allowlist);
}
#[test]
fn test_rch_config_includes_execution() {
let _guard = test_guard!();
let config = RchConfig::default();
assert!(config.execution.is_allowed("cargo"));
}
#[test]
fn test_adaptive_compression_disabled_by_default() {
let _guard = test_guard!();
let config = TransferConfig::default();
assert!(!config.adaptive_compression);
assert_eq!(config.select_compression_level(Some(1_000_000)), 3);
assert_eq!(config.select_compression_level(Some(500_000_000)), 3);
}
#[test]
fn test_adaptive_compression_small_payload() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
..Default::default()
};
assert_eq!(config.select_compression_level(Some(0)), 1);
assert_eq!(config.select_compression_level(Some(1_000_000)), 1); assert_eq!(config.select_compression_level(Some(9_999_999)), 1); }
#[test]
fn test_adaptive_compression_medium_payload() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
..Default::default()
};
assert_eq!(config.select_compression_level(Some(10_000_000)), 3); assert_eq!(config.select_compression_level(Some(100_000_000)), 3); assert_eq!(config.select_compression_level(Some(199_999_999)), 3); }
#[test]
fn test_adaptive_compression_large_payload() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
..Default::default()
};
assert_eq!(config.select_compression_level(Some(200_000_000)), 7); assert_eq!(config.select_compression_level(Some(500_000_000)), 7); assert_eq!(config.select_compression_level(Some(1_000_000_000)), 7); }
#[test]
fn test_adaptive_compression_no_estimate() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
compression_level: 5,
..Default::default()
};
assert_eq!(config.select_compression_level(None), 5);
}
#[test]
fn test_adaptive_compression_respects_min_level() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
min_compression_level: 3,
..Default::default()
};
assert_eq!(config.select_compression_level(Some(1_000_000)), 3);
}
#[test]
fn test_adaptive_compression_respects_max_level() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
max_compression_level: 5,
..Default::default()
};
assert_eq!(config.select_compression_level(Some(500_000_000)), 5);
}
#[test]
fn test_adaptive_compression_serde_roundtrip() {
let _guard = test_guard!();
let config = TransferConfig {
adaptive_compression: true,
min_compression_level: 2,
max_compression_level: 8,
..Default::default()
};
let json = serde_json::to_string(&config).unwrap();
let parsed: TransferConfig = serde_json::from_str(&json).unwrap();
assert!(parsed.adaptive_compression);
assert_eq!(parsed.min_compression_level, 2);
assert_eq!(parsed.max_compression_level, 8);
}
#[test]
fn test_self_healing_config_defaults() {
let _guard = test_guard!();
let config = SelfHealingConfig::default();
assert!(
config.hook_starts_daemon,
"hook_starts_daemon should be true by default"
);
assert!(
config.daemon_installs_hooks,
"daemon_installs_hooks should be true by default"
);
assert_eq!(
config.auto_start_cooldown_secs, 30,
"auto_start_cooldown_secs should default to 30"
);
assert_eq!(
config.auto_start_timeout_secs, 3,
"auto_start_timeout_secs should default to 3"
);
}
#[test]
fn test_self_healing_config_serde_full() {
let _guard = test_guard!();
let config = SelfHealingConfig {
hook_starts_daemon: false,
daemon_installs_hooks: false,
auto_start_cooldown_secs: 60,
auto_start_timeout_secs: 10,
self_healing_log_level: SelfHealingLogLevel::Debug,
};
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"hook_starts_daemon\":false"));
assert!(json.contains("\"daemon_installs_hooks\":false"));
assert!(json.contains("\"auto_start_cooldown_secs\":60"));
assert!(json.contains("\"auto_start_timeout_secs\":10"));
assert!(json.contains("\"self_healing_log_level\":\"debug\""));
let parsed: SelfHealingConfig = serde_json::from_str(&json).unwrap();
assert!(!parsed.hook_starts_daemon);
assert!(!parsed.daemon_installs_hooks);
assert_eq!(parsed.auto_start_cooldown_secs, 60);
assert_eq!(parsed.auto_start_timeout_secs, 10);
assert_eq!(parsed.self_healing_log_level, SelfHealingLogLevel::Debug);
}
#[test]
fn test_self_healing_log_level_default_is_info() {
let level = SelfHealingLogLevel::default();
assert_eq!(level, SelfHealingLogLevel::Info);
let config = SelfHealingConfig::default();
assert_eq!(
config.self_healing_log_level,
SelfHealingLogLevel::Info,
"config default should be Info"
);
}
#[test]
fn test_self_healing_log_level_serializes_lowercase() {
let json = serde_json::to_string(&SelfHealingLogLevel::Warn).unwrap();
assert_eq!(json, "\"warn\"");
let parsed: SelfHealingLogLevel = serde_json::from_str("\"error\"").unwrap();
assert_eq!(parsed, SelfHealingLogLevel::Error);
}
#[test]
fn test_self_healing_log_level_from_env_str_accepts_known() {
assert_eq!(
SelfHealingLogLevel::from_env_str("debug"),
Some(SelfHealingLogLevel::Debug)
);
assert_eq!(
SelfHealingLogLevel::from_env_str("DEBUG"),
Some(SelfHealingLogLevel::Debug)
);
assert_eq!(
SelfHealingLogLevel::from_env_str("info"),
Some(SelfHealingLogLevel::Info)
);
assert_eq!(
SelfHealingLogLevel::from_env_str("warn"),
Some(SelfHealingLogLevel::Warn)
);
assert_eq!(
SelfHealingLogLevel::from_env_str("warning"),
Some(SelfHealingLogLevel::Warn),
"warning should map to Warn"
);
assert_eq!(
SelfHealingLogLevel::from_env_str("error"),
Some(SelfHealingLogLevel::Error)
);
assert_eq!(
SelfHealingLogLevel::from_env_str(" debug "),
Some(SelfHealingLogLevel::Debug)
);
}
#[test]
fn test_self_healing_log_level_from_env_str_rejects_unknown() {
assert!(SelfHealingLogLevel::from_env_str("banana").is_none());
assert!(SelfHealingLogLevel::from_env_str("").is_none());
assert!(SelfHealingLogLevel::from_env_str("trace").is_none());
}
#[test]
fn test_self_healing_config_serde_partial_uses_defaults() {
let _guard = test_guard!();
let json = r#"{"hook_starts_daemon": false}"#;
let config: SelfHealingConfig = serde_json::from_str(json).unwrap();
assert!(
!config.hook_starts_daemon,
"Explicit false should be parsed"
);
assert!(
config.daemon_installs_hooks,
"Missing field should use default (true)"
);
assert_eq!(
config.auto_start_cooldown_secs, 30,
"Missing field should use default (30)"
);
assert_eq!(
config.auto_start_timeout_secs, 3,
"Missing field should use default (3)"
);
}
#[test]
fn test_self_healing_config_toml_with_alias() {
let _guard = test_guard!();
let toml_str = r#"
hook_starts_daemon = true
daemon_installs_hooks = false
auto_start_cooldown_secs = 45
daemon_start_timeout = 7
"#;
let config: SelfHealingConfig = toml::from_str(toml_str).unwrap();
assert!(config.hook_starts_daemon);
assert!(!config.daemon_installs_hooks);
assert_eq!(config.auto_start_cooldown_secs, 45);
assert_eq!(
config.auto_start_timeout_secs, 7,
"daemon_start_timeout alias should set auto_start_timeout_secs"
);
}
#[allow(unsafe_code)]
mod self_healing_env_override_tests {
use super::*;
use crate::config::env_test_lock;
fn env_guard() -> std::sync::MutexGuard<'static, ()> {
env_test_lock()
}
fn set_env(key: &str, value: &str) {
unsafe { std::env::set_var(key, value) };
}
fn remove_env(key: &str) {
unsafe { std::env::remove_var(key) };
}
struct EnvVarGuard {
key: &'static str,
old: Option<String>,
}
impl EnvVarGuard {
fn set(key: &'static str, value: &str) -> Self {
let old = std::env::var(key).ok();
set_env(key, value);
Self { key, old }
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
if let Some(old) = &self.old {
set_env(self.key, old);
} else {
remove_env(self.key);
}
}
}
#[test]
fn test_self_healing_config_with_env_overrides_master_disable() {
let _guard = test_guard!();
let _guard = env_guard();
let _no_self_healing = EnvVarGuard::set("RCH_NO_SELF_HEALING", "1");
let _hook_starts_daemon = EnvVarGuard::set("RCH_HOOK_STARTS_DAEMON", "1");
let _daemon_installs_hooks = EnvVarGuard::set("RCH_DAEMON_INSTALLS_HOOKS", "1");
let _cooldown = EnvVarGuard::set("RCH_AUTO_START_COOLDOWN_SECS", "99");
let _timeout = EnvVarGuard::set("RCH_AUTO_START_TIMEOUT_SECS", "99");
let config = SelfHealingConfig::default().with_env_overrides();
assert!(
!config.hook_starts_daemon,
"RCH_NO_SELF_HEALING should disable hook auto-start"
);
assert!(
!config.daemon_installs_hooks,
"RCH_NO_SELF_HEALING should disable daemon hook installation"
);
assert_eq!(config.auto_start_cooldown_secs, 30);
assert_eq!(config.auto_start_timeout_secs, 3);
}
#[test]
fn test_self_healing_config_with_env_overrides_toggles_and_numbers() {
let _guard = test_guard!();
let _guard = env_guard();
let _hook_starts_daemon = EnvVarGuard::set("RCH_HOOK_STARTS_DAEMON", "0");
let _daemon_installs_hooks = EnvVarGuard::set("RCH_DAEMON_INSTALLS_HOOKS", "false");
let _cooldown = EnvVarGuard::set("RCH_AUTO_START_COOLDOWN_SECS", "45");
let _timeout = EnvVarGuard::set("RCH_AUTO_START_TIMEOUT_SECS", "7");
let config = SelfHealingConfig::default().with_env_overrides();
assert!(!config.hook_starts_daemon);
assert!(!config.daemon_installs_hooks);
assert_eq!(config.auto_start_cooldown_secs, 45);
assert_eq!(config.auto_start_timeout_secs, 7);
}
#[test]
fn test_self_healing_config_with_env_overrides_invalid_numbers_ignored() {
let _guard = test_guard!();
let _guard = env_guard();
let _cooldown = EnvVarGuard::set("RCH_AUTO_START_COOLDOWN_SECS", "not-a-number");
let _timeout = EnvVarGuard::set("RCH_AUTO_START_TIMEOUT_SECS", "nope");
let config = SelfHealingConfig::default().with_env_overrides();
assert_eq!(config.auto_start_cooldown_secs, 30);
assert_eq!(config.auto_start_timeout_secs, 3);
}
}
#[test]
fn test_load_per_core_calculation() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
assert!(caps.load_per_core().is_none());
caps.load_avg_1 = Some(4.0);
assert!(caps.load_per_core().is_none());
caps.num_cpus = Some(4);
assert_eq!(caps.load_per_core(), Some(1.0));
caps.load_avg_1 = Some(16.0);
assert_eq!(caps.load_per_core(), Some(4.0)); }
#[test]
fn test_is_high_load() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
caps.load_avg_1 = Some(8.0);
caps.num_cpus = Some(4);
assert_eq!(caps.is_high_load(3.0), Some(false));
assert_eq!(caps.is_high_load(2.0), Some(false)); assert_eq!(caps.is_high_load(1.5), Some(true));
caps.load_avg_1 = None;
assert!(caps.is_high_load(1.0).is_none());
}
#[test]
fn test_is_low_disk() {
let _guard = test_guard!();
let mut caps = WorkerCapabilities::new();
caps.disk_free_gb = Some(15.0);
assert_eq!(caps.is_low_disk(10.0), Some(false));
assert_eq!(caps.is_low_disk(15.0), Some(false)); assert_eq!(caps.is_low_disk(20.0), Some(true));
caps.disk_free_gb = None;
assert!(caps.is_low_disk(10.0).is_none());
}
#[test]
fn test_selection_config_preflight_defaults() {
let _guard = test_guard!();
let config = SelectionConfig::default();
assert_eq!(config.max_load_per_core, Some(2.0));
assert_eq!(config.min_free_gb, Some(10.0));
}
#[test]
fn test_selection_config_preflight_serde() {
let _guard = test_guard!();
let config = SelectionConfig {
max_load_per_core: Some(3.5),
min_free_gb: Some(25.0),
..Default::default()
};
let json = serde_json::to_string(&config).unwrap();
let parsed: SelectionConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.max_load_per_core, Some(3.5));
assert_eq!(parsed.min_free_gb, Some(25.0));
}
#[test]
fn test_worker_capabilities_health_metrics_serde() {
let _guard = test_guard!();
let caps = WorkerCapabilities {
num_cpus: Some(8),
load_avg_1: Some(1.5),
load_avg_5: Some(2.0),
load_avg_15: Some(1.8),
disk_free_gb: Some(50.5),
disk_total_gb: Some(100.0),
..Default::default()
};
let json = serde_json::to_string(&caps).unwrap();
let parsed: WorkerCapabilities = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.num_cpus, Some(8));
assert_eq!(parsed.load_avg_1, Some(1.5));
assert_eq!(parsed.disk_free_gb, Some(50.5));
}
}