extern crate base64;
extern crate crossbeam_utils;
extern crate env_logger;
extern crate flate2;
extern crate hyperx;
extern crate jsonwebtoken as jwt;
extern crate libmount;
#[macro_use]
extern crate log;
extern crate nix;
extern crate openssl;
extern crate rand;
extern crate reqwest;
extern crate sccache;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate syslog;
extern crate tar;
extern crate void;
use anyhow::{bail, Context, Error, Result};
use rand::{rngs::OsRng, RngCore};
use sccache::config::{
scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN,
};
use sccache::dist::{
self, AllocJobResult, AssignJobResult, BuilderIncoming, CompileCommand, HeartbeatServerResult,
InputsReader, JobAlloc, JobAuthorizer, JobComplete, JobId, JobState, RunJobResult,
SchedulerIncoming, SchedulerOutgoing, SchedulerStatusResult, ServerId, ServerIncoming,
ServerNonce, ServerOutgoing, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader,
UpdateJobStateResult,
};
use sccache::util::daemonize;
use std::collections::{btree_map, BTreeMap, HashMap, HashSet};
use std::env;
use std::io;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::time::{Duration, Instant};
mod build;
mod cmdline;
mod token_check;
use cmdline::{AuthSubcommand, Command};
pub const INSECURE_DIST_SERVER_TOKEN: &str = "dangerously_insecure_server";
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn main() {
init_logging();
let command = match cmdline::try_parse_from(env::args()) {
Ok(cmd) => cmd,
Err(e) => match e.downcast::<clap::error::Error>() {
Ok(clap_err) => clap_err.exit(),
Err(some_other_err) => {
println!("sccache-dist: {some_other_err}");
for source_err in some_other_err.chain().skip(1) {
println!("sccache-dist: caused by: {source_err}");
}
std::process::exit(1);
}
},
};
std::process::exit(match run(command) {
Ok(s) => s,
Err(e) => {
eprintln!("sccache-dist: error: {}", e);
for e in e.chain().skip(1) {
eprintln!("sccache-dist: caused by: {}", e);
}
2
}
});
}
fn create_server_token(server_id: ServerId, auth_token: &str) -> String {
format!("{} {}", server_id.addr(), auth_token)
}
fn check_server_token(server_token: &str, auth_token: &str) -> Option<ServerId> {
let mut split = server_token.splitn(2, |c| c == ' ');
let server_addr = split.next().and_then(|addr| addr.parse().ok())?;
match split.next() {
Some(t) if t == auth_token => Some(ServerId::new(server_addr)),
Some(_) | None => None,
}
}
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct ServerJwt {
exp: u64,
server_id: ServerId,
}
fn create_jwt_server_token(
server_id: ServerId,
header: &jwt::Header,
key: &[u8],
) -> Result<String> {
let key = jwt::EncodingKey::from_secret(key);
jwt::encode(header, &ServerJwt { exp: 0, server_id }, &key).map_err(Into::into)
}
fn dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Result<ServerId> {
let validation = {
let mut validation = jwt::Validation::default();
validation.validate_exp = false;
validation.validate_nbf = false;
validation.insecure_disable_signature_validation();
validation
};
let dummy_key = jwt::DecodingKey::from_secret(b"secret");
jwt::decode::<ServerJwt>(server_token, &dummy_key, &validation)
.map(|res| res.claims.server_id)
.map_err(Into::into)
}
fn check_jwt_server_token(
server_token: &str,
key: &[u8],
validation: &jwt::Validation,
) -> Option<ServerId> {
let key = jwt::DecodingKey::from_secret(key);
jwt::decode::<ServerJwt>(server_token, &key, validation)
.map(|res| res.claims.server_id)
.ok()
}
fn run(command: Command) -> Result<i32> {
match command {
Command::Auth(AuthSubcommand::Base64 { num_bytes }) => {
let mut bytes = vec![0; num_bytes];
OsRng.fill_bytes(&mut bytes);
println!("{}", base64::encode_config(&bytes, base64::URL_SAFE_NO_PAD));
Ok(0)
}
Command::Auth(AuthSubcommand::JwtHS256ServerToken {
secret_key,
server_id,
}) => {
let header = jwt::Header::new(jwt::Algorithm::HS256);
let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)?;
let token = create_jwt_server_token(server_id, &header, &secret_key)
.context("Failed to create server token")?;
println!("{}", token);
Ok(0)
}
Command::Scheduler(scheduler_config::Config {
public_addr,
client_auth,
server_auth,
}) => {
let check_client_auth: Box<dyn dist::http::ClientAuthCheck> = match client_auth {
scheduler_config::ClientAuth::Insecure => Box::new(token_check::EqCheck::new(
INSECURE_DIST_CLIENT_TOKEN.to_owned(),
)),
scheduler_config::ClientAuth::Token { token } => {
Box::new(token_check::EqCheck::new(token))
}
scheduler_config::ClientAuth::JwtValidate {
audience,
issuer,
jwks_url,
} => Box::new(
token_check::ValidJWTCheck::new(audience, issuer, &jwks_url)
.context("Failed to create a checker for valid JWTs")?,
),
scheduler_config::ClientAuth::Mozilla { required_groups } => {
Box::new(token_check::MozillaCheck::new(required_groups))
}
scheduler_config::ClientAuth::ProxyToken { url, cache_secs } => {
Box::new(token_check::ProxyTokenCheck::new(url, cache_secs))
}
};
let check_server_auth: dist::http::ServerAuthCheck = match server_auth {
scheduler_config::ServerAuth::Insecure => {
warn!("Scheduler starting with DANGEROUSLY_INSECURE server authentication");
let token = INSECURE_DIST_SERVER_TOKEN;
Box::new(move |server_token| check_server_token(server_token, token))
}
scheduler_config::ServerAuth::Token { token } => {
Box::new(move |server_token| check_server_token(server_token, &token))
}
scheduler_config::ServerAuth::JwtHS256 { secret_key } => {
let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)
.context("Secret key base64 invalid")?;
if secret_key.len() != 256 / 8 {
bail!("Size of secret key incorrect")
}
let validation = {
let mut validation = jwt::Validation::new(jwt::Algorithm::HS256);
validation.leeway = 0;
validation.validate_exp = false;
validation.validate_nbf = false;
validation
};
Box::new(move |server_token| {
check_jwt_server_token(server_token, &secret_key, &validation)
})
}
};
daemonize()?;
let scheduler = Scheduler::new();
let http_scheduler = dist::http::Scheduler::new(
public_addr,
scheduler,
check_client_auth,
check_server_auth,
);
void::unreachable(http_scheduler.start()?);
}
Command::Server(server_config::Config {
builder,
cache_dir,
public_addr,
scheduler_url,
scheduler_auth,
toolchain_cache_size,
}) => {
let builder: Box<dyn dist::BuilderIncoming> = match builder {
server_config::BuilderType::Docker => {
Box::new(build::DockerBuilder::new().context("Docker builder failed to start")?)
}
server_config::BuilderType::Overlay {
bwrap_path,
build_dir,
} => Box::new(
build::OverlayBuilder::new(bwrap_path, build_dir)
.context("Overlay builder failed to start")?,
),
};
let server_id = ServerId::new(public_addr);
let scheduler_auth = match scheduler_auth {
server_config::SchedulerAuth::Insecure => {
warn!("Server starting with DANGEROUSLY_INSECURE scheduler authentication");
create_server_token(server_id, INSECURE_DIST_SERVER_TOKEN)
}
server_config::SchedulerAuth::Token { token } => {
create_server_token(server_id, &token)
}
server_config::SchedulerAuth::JwtToken { token } => {
let token_server_id: ServerId =
dangerous_insecure_extract_jwt_server_token(&token)
.context("Could not decode scheduler auth jwt")?;
if token_server_id != server_id {
bail!(
"JWT server id ({:?}) did not match configured server id ({:?})",
token_server_id,
server_id
)
}
token
}
};
let server = Server::new(builder, &cache_dir, toolchain_cache_size)
.context("Failed to create sccache server instance")?;
let http_server = dist::http::Server::new(
public_addr,
scheduler_url.to_url(),
scheduler_auth,
server,
)
.context("Failed to create sccache HTTP server instance")?;
void::unreachable(http_server.start()?)
}
}
}
fn init_logging() {
if env::var(sccache::LOGGING_ENV).is_ok() {
match env_logger::Builder::from_env(sccache::LOGGING_ENV).try_init() {
Ok(_) => (),
Err(e) => panic!("Failed to initialize logging: {:?}", e),
}
}
}
const MAX_PER_CORE_LOAD: f64 = 10f64;
const SERVER_REMEMBER_ERROR_TIMEOUT: Duration = Duration::from_secs(300);
const UNCLAIMED_PENDING_TIMEOUT: Duration = Duration::from_secs(300);
const UNCLAIMED_READY_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Copy, Clone)]
struct JobDetail {
server_id: ServerId,
state: JobState,
}
pub struct Scheduler {
job_count: AtomicUsize,
jobs: Mutex<BTreeMap<JobId, JobDetail>>,
servers: Mutex<HashMap<ServerId, ServerDetails>>,
}
struct ServerDetails {
jobs_assigned: HashSet<JobId>,
jobs_unclaimed: HashMap<JobId, Instant>,
last_seen: Instant,
last_error: Option<Instant>,
num_cpus: usize,
server_nonce: ServerNonce,
job_authorizer: Box<dyn JobAuthorizer>,
}
impl Scheduler {
pub fn new() -> Self {
Scheduler {
job_count: AtomicUsize::new(0),
jobs: Mutex::new(BTreeMap::new()),
servers: Mutex::new(HashMap::new()),
}
}
fn prune_servers(
&self,
servers: &mut MutexGuard<HashMap<ServerId, ServerDetails>>,
jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>,
) {
let now = Instant::now();
let mut dead_servers = Vec::new();
for (&server_id, details) in servers.iter_mut() {
if now.duration_since(details.last_seen) > dist::http::HEARTBEAT_TIMEOUT {
dead_servers.push(server_id);
}
}
for server_id in dead_servers {
warn!(
"Server {} appears to be dead, pruning it in the scheduler",
server_id.addr()
);
let server_details = servers
.remove(&server_id)
.expect("server went missing from map");
for job_id in server_details.jobs_assigned {
warn!(
"Non-terminated job {} was cleaned up in server pruning",
job_id
);
if jobs.remove(&job_id).is_none() {
warn!(
"Non-terminated job {} assignment originally failed.",
job_id
);
}
}
}
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new()
}
}
impl SchedulerIncoming for Scheduler {
fn handle_alloc_job(
&self,
requester: &dyn SchedulerOutgoing,
tc: Toolchain,
) -> Result<AllocJobResult> {
let (job_id, server_id, auth) = {
let mut servers = self.servers.lock().unwrap();
let res = {
let mut best = None;
let mut best_err = None;
let mut best_load: f64 = MAX_PER_CORE_LOAD;
let now = Instant::now();
for (&server_id, details) in servers.iter_mut() {
let load = details.jobs_assigned.len() as f64 / details.num_cpus as f64;
if let Some(last_error) = details.last_error {
if load < MAX_PER_CORE_LOAD {
if now.duration_since(last_error) > SERVER_REMEMBER_ERROR_TIMEOUT {
details.last_error = None;
}
match best_err {
Some((
_,
&mut ServerDetails {
last_error: Some(best_last_err),
..
},
)) => {
if last_error < best_last_err {
trace!(
"Selected {:?}, its most recent error is {:?} ago",
server_id,
now - last_error
);
best_err = Some((server_id, details));
}
}
_ => {
trace!(
"Selected {:?}, its most recent error is {:?} ago",
server_id,
now - last_error
);
best_err = Some((server_id, details));
}
}
}
} else if load < best_load {
best = Some((server_id, details));
trace!("Selected {:?} as the server with the best load", server_id);
best_load = load;
if load == 0f64 {
break;
}
}
}
if let Some((server_id, server_details)) = best.or(best_err) {
let job_count = self.job_count.fetch_add(1, Ordering::SeqCst) as u64;
let job_id = JobId(job_count);
assert!(server_details.jobs_assigned.insert(job_id));
assert!(server_details
.jobs_unclaimed
.insert(job_id, Instant::now())
.is_none());
info!(
"Job {} created and will be assigned to server {:?}",
job_id, server_id
);
let auth = server_details
.job_authorizer
.generate_token(job_id)
.map_err(Error::from)
.context("Could not create an auth token for this job")?;
Some((job_id, server_id, auth))
} else {
None
}
};
if let Some(res) = res {
res
} else {
let msg = format!(
"Insufficient capacity across {} available servers",
servers.len()
);
return Ok(AllocJobResult::Fail { msg });
}
};
let AssignJobResult {
state,
need_toolchain,
} = requester
.do_assign_job(server_id, job_id, tc, auth.clone())
.with_context(|| {
let mut servers = self.servers.lock().unwrap();
if let Some(entry) = servers.get_mut(&server_id) {
entry.last_error = Some(Instant::now());
entry.jobs_unclaimed.remove(&job_id);
if !entry.jobs_assigned.remove(&job_id) {
"assign job failed and job not known to the server"
} else {
"assign job failed, job un-assigned from the server"
}
} else {
"assign job failed and server not known"
}
})?;
{
let mut jobs = self.jobs.lock().unwrap();
info!(
"Job {} successfully assigned and saved with state {:?}",
job_id, state
);
assert!(jobs
.insert(job_id, JobDetail { server_id, state })
.is_none());
}
let job_alloc = JobAlloc {
auth,
job_id,
server_id,
};
Ok(AllocJobResult::Success {
job_alloc,
need_toolchain,
})
}
fn handle_heartbeat_server(
&self,
server_id: ServerId,
server_nonce: ServerNonce,
num_cpus: usize,
job_authorizer: Box<dyn JobAuthorizer>,
) -> Result<HeartbeatServerResult> {
if num_cpus == 0 {
bail!("Invalid number of CPUs (0) specified in heartbeat")
}
let mut jobs = self.jobs.lock().unwrap();
let mut servers = self.servers.lock().unwrap();
self.prune_servers(&mut servers, &mut jobs);
match servers.get_mut(&server_id) {
Some(ref mut details) if details.server_nonce == server_nonce => {
let now = Instant::now();
details.last_seen = now;
let mut stale_jobs = Vec::new();
for (&job_id, &last_seen) in details.jobs_unclaimed.iter() {
if now.duration_since(last_seen) < UNCLAIMED_READY_TIMEOUT {
continue;
}
if let Some(detail) = jobs.get(&job_id) {
match detail.state {
JobState::Ready => {
stale_jobs.push(job_id);
}
JobState::Pending => {
if now.duration_since(last_seen) > UNCLAIMED_PENDING_TIMEOUT {
stale_jobs.push(job_id);
}
}
state => {
warn!("Invalid unclaimed job state for {}: {}", job_id, state);
}
}
} else {
warn!("Unknown stale job {}", job_id);
stale_jobs.push(job_id);
}
}
if !stale_jobs.is_empty() {
warn!(
"The following stale jobs will be de-allocated: {:?}",
stale_jobs
);
for job_id in stale_jobs {
if !details.jobs_assigned.remove(&job_id) {
warn!(
"Stale job for server {} not assigned: {}",
server_id.addr(),
job_id
);
}
if details.jobs_unclaimed.remove(&job_id).is_none() {
warn!(
"Unknown stale job for server {}: {}",
server_id.addr(),
job_id
);
}
if jobs.remove(&job_id).is_none() {
warn!(
"Unknown stale job for server {}: {}",
server_id.addr(),
job_id
);
}
}
}
return Ok(HeartbeatServerResult { is_new: false });
}
Some(ref mut details) if details.server_nonce != server_nonce => {
for job_id in details.jobs_assigned.iter() {
if jobs.remove(job_id).is_none() {
warn!(
"Unknown job found when replacing server {}: {}",
server_id.addr(),
job_id
);
}
}
}
_ => (),
}
info!("Registered new server {:?}", server_id);
servers.insert(
server_id,
ServerDetails {
last_seen: Instant::now(),
last_error: None,
jobs_assigned: HashSet::new(),
jobs_unclaimed: HashMap::new(),
num_cpus,
server_nonce,
job_authorizer,
},
);
Ok(HeartbeatServerResult { is_new: true })
}
fn handle_update_job_state(
&self,
job_id: JobId,
server_id: ServerId,
job_state: JobState,
) -> Result<UpdateJobStateResult> {
let mut jobs = self.jobs.lock().unwrap();
let mut servers = self.servers.lock().unwrap();
if let btree_map::Entry::Occupied(mut entry) = jobs.entry(job_id) {
let job_detail = entry.get();
if job_detail.server_id != server_id {
bail!(
"Job id {} is not registered on server {:?}",
job_id,
server_id
)
}
let now = Instant::now();
let mut server_details = servers.get_mut(&server_id);
if let Some(ref mut details) = server_details {
details.last_seen = now;
};
match (job_detail.state, job_state) {
(JobState::Pending, JobState::Ready) => entry.get_mut().state = job_state,
(JobState::Ready, JobState::Started) => {
if let Some(details) = server_details {
details.jobs_unclaimed.remove(&job_id);
} else {
warn!("Job state updated, but server is not known to scheduler")
}
entry.get_mut().state = job_state
}
(JobState::Started, JobState::Complete) => {
let (job_id, _) = entry.remove_entry();
if let Some(entry) = server_details {
assert!(entry.jobs_assigned.remove(&job_id))
} else {
bail!("Job was marked as finished, but server is not known to scheduler")
}
}
(from, to) => bail!("Invalid job state transition from {} to {}", from, to),
}
info!("Job {} updated state to {:?}", job_id, job_state);
} else {
bail!("Unknown job")
}
Ok(UpdateJobStateResult::Success)
}
fn handle_status(&self) -> Result<SchedulerStatusResult> {
let mut jobs = self.jobs.lock().unwrap();
let mut servers = self.servers.lock().unwrap();
self.prune_servers(&mut servers, &mut jobs);
Ok(SchedulerStatusResult {
num_servers: servers.len(),
num_cpus: servers.values().map(|v| v.num_cpus).sum(),
in_progress: jobs.len(),
})
}
}
pub struct Server {
builder: Box<dyn BuilderIncoming>,
cache: Mutex<TcCache>,
job_toolchains: Mutex<HashMap<JobId, Toolchain>>,
}
impl Server {
pub fn new(
builder: Box<dyn BuilderIncoming>,
cache_dir: &Path,
toolchain_cache_size: u64,
) -> Result<Server> {
let cache = TcCache::new(&cache_dir.join("tc"), toolchain_cache_size)
.context("Failed to create toolchain cache")?;
Ok(Server {
builder,
cache: Mutex::new(cache),
job_toolchains: Mutex::new(HashMap::new()),
})
}
}
impl ServerIncoming for Server {
fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
assert!(self
.job_toolchains
.lock()
.unwrap()
.insert(job_id, tc)
.is_none());
let state = if need_toolchain {
JobState::Pending
} else {
JobState::Ready
};
Ok(AssignJobResult {
state,
need_toolchain,
})
}
fn handle_submit_toolchain(
&self,
requester: &dyn ServerOutgoing,
job_id: JobId,
tc_rdr: ToolchainReader,
) -> Result<SubmitToolchainResult> {
requester
.do_update_job_state(job_id, JobState::Ready)
.context("Updating job state failed")?;
let tc = match self.job_toolchains.lock().unwrap().get(&job_id).cloned() {
Some(tc) => tc,
None => return Ok(SubmitToolchainResult::JobNotFound),
};
let mut cache = self.cache.lock().unwrap();
if cache.contains_toolchain(&tc) {
return Ok(SubmitToolchainResult::Success);
}
Ok(cache
.insert_with(&tc, |mut file| {
io::copy(&mut { tc_rdr }, &mut file).map(|_| ())
})
.map(|_| SubmitToolchainResult::Success)
.unwrap_or(SubmitToolchainResult::CannotCache))
}
fn handle_run_job(
&self,
requester: &dyn ServerOutgoing,
job_id: JobId,
command: CompileCommand,
outputs: Vec<String>,
inputs_rdr: InputsReader,
) -> Result<RunJobResult> {
requester
.do_update_job_state(job_id, JobState::Started)
.context("Updating job state failed")?;
let tc = self.job_toolchains.lock().unwrap().remove(&job_id);
let res = match tc {
None => Ok(RunJobResult::JobNotFound),
Some(tc) => {
match self
.builder
.run_build(tc, command, outputs, inputs_rdr, &self.cache)
{
Err(e) => Err(e.context("run build failed")),
Ok(res) => Ok(RunJobResult::Complete(JobComplete {
output: res.output,
outputs: res.outputs,
})),
}
}
};
requester
.do_update_job_state(job_id, JobState::Complete)
.context("Updating job state failed")?;
res
}
}