use crate::cache::readonly::ReadOnlyStorage;
use crate::cache::{storage_from_config, CacheMode, Storage};
use crate::compiler::{
get_compiler_info, CacheControl, CompileResult, Compiler, CompilerArguments, CompilerHasher,
CompilerKind, CompilerProxy, DistType, Language, MissType,
};
#[cfg(feature = "dist-client")]
use crate::config;
use crate::config::Config;
use crate::dist;
use crate::jobserver::Client;
use crate::mock_command::{CommandCreatorSync, ProcessCommandCreator};
use crate::protocol::{Compile, CompileFinished, CompileResponse, Request, Response};
use crate::util;
#[cfg(feature = "dist-client")]
use anyhow::Context as _;
use bytes::{buf::BufMut, Bytes, BytesMut};
use filetime::FileTime;
use fs::metadata;
use fs_err as fs;
use futures::channel::mpsc;
use futures::future::FutureExt;
use futures::{future, stream, Sink, SinkExt, Stream, StreamExt, TryFutureExt};
use number_prefix::NumberPrefix;
use serde::{Deserialize, Serialize};
use std::cell::Cell;
use std::collections::HashMap;
use std::env;
use std::ffi::{OsStr, OsString};
use std::future::Future;
use std::io::{self, Write};
use std::marker::Unpin;
#[cfg(feature = "dist-client")]
use std::mem;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::pin::Pin;
use std::process::{ExitStatus, Output};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
#[cfg(feature = "dist-client")]
use std::time::Instant;
use std::u64;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
runtime::Runtime,
time::{self, sleep, Sleep},
};
use tokio_serde::Framed;
use tokio_util::codec::{length_delimited, LengthDelimitedCodec};
use tower::Service;
use crate::errors::*;
const DEFAULT_IDLE_TIMEOUT: u64 = 600;
#[cfg(feature = "dist-client")]
const DIST_CLIENT_RECREATE_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerStartup {
Ok { port: u16 },
AddrInUse,
TimedOut,
Err { reason: String },
}
fn get_idle_timeout() -> u64 {
env::var("SCCACHE_IDLE_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_IDLE_TIMEOUT)
}
fn notify_server_startup_internal<W: Write>(mut w: W, status: ServerStartup) -> Result<()> {
util::write_length_prefixed_bincode(&mut w, status)
}
#[cfg(unix)]
fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
use std::os::unix::net::UnixStream;
let name = match *name {
Some(ref s) => s,
None => return Ok(()),
};
debug!("notify_server_startup({:?})", status);
let stream = UnixStream::connect(name)?;
notify_server_startup_internal(stream, status)
}
#[cfg(windows)]
fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
use fs::OpenOptions;
let name = match *name {
Some(ref s) => s,
None => return Ok(()),
};
let pipe = OpenOptions::new().write(true).read(true).open(name)?;
notify_server_startup_internal(pipe, status)
}
#[cfg(unix)]
fn get_signal(status: ExitStatus) -> i32 {
use std::os::unix::prelude::*;
status.signal().expect("must have signal")
}
#[cfg(windows)]
fn get_signal(_status: ExitStatus) -> i32 {
panic!("no signals on windows")
}
pub struct DistClientContainer {
#[cfg(feature = "dist-client")]
state: futures::lock::Mutex<DistClientState>,
}
#[cfg(feature = "dist-client")]
struct DistClientConfig {
pool: tokio::runtime::Handle,
scheduler_url: Option<config::HTTPUrl>,
auth: config::DistAuth,
cache_dir: PathBuf,
toolchain_cache_size: u64,
toolchains: Vec<config::DistToolchainConfig>,
rewrite_includes_only: bool,
}
#[cfg(feature = "dist-client")]
enum DistClientState {
#[cfg(feature = "dist-client")]
Some(Box<DistClientConfig>, Arc<dyn dist::Client>),
#[cfg(feature = "dist-client")]
FailWithMessage(Box<DistClientConfig>, String),
#[cfg(feature = "dist-client")]
RetryCreateAt(Box<DistClientConfig>, Instant),
Disabled,
}
#[cfg(not(feature = "dist-client"))]
impl DistClientContainer {
#[cfg(not(feature = "dist-client"))]
fn new(config: &Config, _: &tokio::runtime::Handle) -> Self {
if config.dist.scheduler_url.is_some() {
warn!("Scheduler address configured but dist feature disabled, disabling distributed sccache")
}
Self {}
}
pub fn new_disabled() -> Self {
Self {}
}
pub async fn reset_state(&self) {}
pub async fn get_status(&self) -> DistInfo {
DistInfo::Disabled("dist-client feature not selected".to_string())
}
async fn get_client(&self) -> Result<Option<Arc<dyn dist::Client>>> {
Ok(None)
}
}
#[cfg(feature = "dist-client")]
impl DistClientContainer {
fn new(config: &Config, pool: &tokio::runtime::Handle) -> Self {
let config = DistClientConfig {
pool: pool.clone(),
scheduler_url: config.dist.scheduler_url.clone(),
auth: config.dist.auth.clone(),
cache_dir: config.dist.cache_dir.clone(),
toolchain_cache_size: config.dist.toolchain_cache_size,
toolchains: config.dist.toolchains.clone(),
rewrite_includes_only: config.dist.rewrite_includes_only,
};
let state = Self::create_state(config);
let state = pool.block_on(state);
Self {
state: futures::lock::Mutex::new(state),
}
}
pub fn new_disabled() -> Self {
Self {
state: futures::lock::Mutex::new(DistClientState::Disabled),
}
}
pub async fn reset_state(&self) {
let mut guard = self.state.lock().await;
let state = &mut *guard;
match mem::replace(state, DistClientState::Disabled) {
DistClientState::Some(cfg, _)
| DistClientState::FailWithMessage(cfg, _)
| DistClientState::RetryCreateAt(cfg, _) => {
warn!("State reset. Will recreate");
*state = DistClientState::RetryCreateAt(
cfg,
Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
);
}
DistClientState::Disabled => (),
}
}
pub async fn get_status(&self) -> DistInfo {
let mut guard = self.state.lock().await;
let state = &mut *guard;
let (client, scheduler_url) = match state {
DistClientState::Disabled => return DistInfo::Disabled("disabled".to_string()),
DistClientState::FailWithMessage(cfg, _) => {
return DistInfo::NotConnected(
cfg.scheduler_url.clone(),
"enabled, auth not configured".to_string(),
)
}
DistClientState::RetryCreateAt(cfg, _) => {
return DistInfo::NotConnected(
cfg.scheduler_url.clone(),
"enabled, not connected, will retry".to_string(),
)
}
DistClientState::Some(cfg, client) => (Arc::clone(client), cfg.scheduler_url.clone()),
};
match client.do_get_status().await {
Ok(res) => DistInfo::SchedulerStatus(scheduler_url.clone(), res),
Err(_) => DistInfo::NotConnected(
scheduler_url.clone(),
"could not communicate with scheduler".to_string(),
),
}
}
async fn get_client(&self) -> Result<Option<Arc<dyn dist::Client>>> {
let mut guard = self.state.lock().await;
let state = &mut *guard;
Self::maybe_recreate_state(state).await;
let res = match state {
DistClientState::Some(_, dc) => Ok(Some(dc.clone())),
DistClientState::Disabled | DistClientState::RetryCreateAt(_, _) => Ok(None),
DistClientState::FailWithMessage(_, msg) => Err(anyhow!(msg.clone())),
};
if res.is_err() {
let config = match mem::replace(state, DistClientState::Disabled) {
DistClientState::FailWithMessage(config, _) => config,
_ => unreachable!(),
};
*state = DistClientState::RetryCreateAt(
config,
Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
);
}
res
}
async fn maybe_recreate_state(state: &mut DistClientState) {
if let DistClientState::RetryCreateAt(_, instant) = *state {
if instant > Instant::now() {
return;
}
let config = match mem::replace(state, DistClientState::Disabled) {
DistClientState::RetryCreateAt(config, _) => config,
_ => unreachable!(),
};
info!("Attempting to recreate the dist client");
*state = Self::create_state(*config).await
}
}
async fn create_state(config: DistClientConfig) -> DistClientState {
macro_rules! try_or_retry_later {
($v:expr) => {{
match $v {
Ok(v) => v,
Err(e) => {
error!("{:?}", e);
return DistClientState::RetryCreateAt(
Box::new(config),
Instant::now() + DIST_CLIENT_RECREATE_TIMEOUT,
);
}
}
}};
}
macro_rules! try_or_fail_with_message {
($v:expr) => {{
match $v {
Ok(v) => v,
Err(e) => {
let errmsg = format!("{:?}", e);
error!("{}", errmsg);
return DistClientState::FailWithMessage(
Box::new(config),
errmsg.to_string(),
);
}
}
}};
}
match config.scheduler_url {
Some(ref addr) => {
let url = addr.to_url();
info!("Enabling distributed sccache to {}", url);
let auth_token = match &config.auth {
config::DistAuth::Token { token } => Ok(token.to_owned()),
config::DistAuth::Oauth2CodeGrantPKCE { auth_url, .. }
| config::DistAuth::Oauth2Implicit { auth_url, .. } => {
Self::get_cached_config_auth_token(auth_url)
}
};
let auth_token = try_or_fail_with_message!(auth_token
.context("could not load client auth token, run |sccache --dist-auth|"));
let dist_client = dist::http::Client::new(
&config.pool,
url,
&config.cache_dir.join("client"),
config.toolchain_cache_size,
&config.toolchains,
auth_token,
config.rewrite_includes_only,
);
let dist_client =
try_or_retry_later!(dist_client.context("failure during dist client creation"));
use crate::dist::Client;
match dist_client.do_get_status().await {
Ok(res) => {
info!(
"Successfully created dist client with {:?} cores across {:?} servers",
res.num_cpus, res.num_servers
);
DistClientState::Some(Box::new(config), Arc::new(dist_client))
}
Err(_) => {
warn!("Scheduler address configured, but could not communicate with scheduler");
DistClientState::RetryCreateAt(
Box::new(config),
Instant::now() + DIST_CLIENT_RECREATE_TIMEOUT,
)
}
}
}
None => {
info!("No scheduler address configured, disabling distributed sccache");
DistClientState::Disabled
}
}
}
fn get_cached_config_auth_token(auth_url: &str) -> Result<String> {
let cached_config = config::CachedConfig::reload()?;
cached_config
.with(|c| c.dist.auth_tokens.get(auth_url).map(String::to_owned))
.with_context(|| format!("token for url {} not present in cached config", auth_url))
}
}
thread_local! {
static PANIC_LOCATION: Cell<Option<(String, u32, u32)>> = Cell::new(None);
}
pub fn start_server(config: &Config, port: u16) -> Result<()> {
info!("start_server: port: {}", port);
let panic_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
PANIC_LOCATION.with(|l| {
l.set(
info.location()
.map(|loc| (loc.file().to_string(), loc.line(), loc.column())),
)
});
panic_hook(info)
}));
let client = unsafe { Client::new() };
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(std::cmp::max(20, 2 * num_cpus::get()))
.build()?;
let pool = runtime.handle().clone();
let dist_client = DistClientContainer::new(config, &pool);
let notify = env::var_os("SCCACHE_STARTUP_NOTIFY");
let raw_storage = match storage_from_config(config, &pool) {
Ok(storage) => storage,
Err(err) => {
error!("storage init failed for: {err:?}");
notify_server_startup(
¬ify,
ServerStartup::Err {
reason: err.to_string(),
},
)?;
return Err(err);
}
};
let cache_mode = runtime.block_on(async {
match raw_storage.check().await {
Ok(mode) => Ok(mode),
Err(err) => {
error!("storage check failed for: {err:?}");
notify_server_startup(
¬ify,
ServerStartup::Err {
reason: err.to_string(),
},
)?;
Err(err)
}
}
})?;
info!("server has setup with {cache_mode:?}");
let storage = match cache_mode {
CacheMode::ReadOnly => Arc::new(ReadOnlyStorage(raw_storage)),
_ => raw_storage,
};
let res =
SccacheServer::<ProcessCommandCreator>::new(port, runtime, client, dist_client, storage);
match res {
Ok(srv) => {
let port = srv.port();
info!("server started, listening on port {}", port);
notify_server_startup(¬ify, ServerStartup::Ok { port })?;
srv.run(future::pending::<()>())?;
Ok(())
}
Err(e) => {
error!("failed to start server: {}", e);
match e.downcast_ref::<io::Error>() {
Some(io_err) if io::ErrorKind::AddrInUse == io_err.kind() => {
notify_server_startup(¬ify, ServerStartup::AddrInUse)?;
}
Some(io_err) if cfg!(windows) && Some(10013) == io_err.raw_os_error() => {
let windows_help_message =
"A Windows port exclusion is blocking use of the configured port.\nTry setting SCCACHE_SERVER_PORT to a new value.";
let reason: String = format!("{windows_help_message}\n{e}");
notify_server_startup(¬ify, ServerStartup::Err { reason })?;
}
_ => {
let reason = e.to_string();
notify_server_startup(¬ify, ServerStartup::Err { reason })?;
}
};
Err(e)
}
}
}
pub struct SccacheServer<C: CommandCreatorSync> {
runtime: Runtime,
listener: TcpListener,
rx: mpsc::Receiver<ServerMessage>,
timeout: Duration,
service: SccacheService<C>,
wait: WaitUntilZero,
}
impl<C: CommandCreatorSync> SccacheServer<C> {
pub fn new(
port: u16,
runtime: Runtime,
client: Client,
dist_client: DistClientContainer,
storage: Arc<dyn Storage>,
) -> Result<SccacheServer<C>> {
let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
let listener = runtime.block_on(TcpListener::bind(&SocketAddr::V4(addr)))?;
Ok(Self::with_listener(
listener,
runtime,
client,
dist_client,
storage,
))
}
pub fn with_listener(
listener: TcpListener,
runtime: Runtime,
client: Client,
dist_client: DistClientContainer,
storage: Arc<dyn Storage>,
) -> SccacheServer<C> {
let (tx, rx) = mpsc::channel(1);
let (wait, info) = WaitUntilZero::new();
let pool = runtime.handle().clone();
let service = SccacheService::new(dist_client, storage, &client, pool, tx, info);
SccacheServer {
runtime,
listener,
rx,
service,
timeout: Duration::from_secs(get_idle_timeout()),
wait,
}
}
#[allow(dead_code)]
pub fn set_idle_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
#[allow(dead_code)]
pub fn set_storage(&mut self, storage: Arc<dyn Storage>) {
self.service.storage = storage;
}
#[allow(dead_code)]
pub fn pool(&self) -> &tokio::runtime::Handle {
&self.service.rt
}
#[allow(dead_code)]
pub fn command_creator(&self) -> &C {
&self.service.creator
}
#[allow(dead_code)]
pub fn port(&self) -> u16 {
self.listener.local_addr().unwrap().port()
}
pub fn run<F>(self, shutdown: F) -> io::Result<()>
where
F: Future,
C: Send,
{
let SccacheServer {
runtime,
listener,
rx,
service,
timeout,
wait,
} = self;
let server = async move {
loop {
let (socket, _) = listener.accept().await?;
trace!("incoming connection");
let conn = service.clone().bind(socket).map_err(|res| {
error!("Failed to bind socket: {}", res);
});
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(conn);
}
};
let shutdown = shutdown.map(|_| {
info!("shutting down due to explicit signal");
});
let shutdown_idle = async {
ShutdownOrInactive {
rx,
timeout: if timeout != Duration::new(0, 0) {
Some(Box::pin(sleep(timeout)))
} else {
None
},
timeout_dur: timeout,
}
.await;
info!("shutting down due to being idle or request");
};
runtime.block_on(async {
futures::select! {
server = server.fuse() => server,
_res = shutdown.fuse() => Ok(()),
_res = shutdown_idle.fuse() => Ok::<_, io::Error>(()),
}
})?;
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
info!(
"moving into the shutdown phase now, waiting at most {} seconds \
for all client requests to complete",
SHUTDOWN_TIMEOUT.as_secs()
);
runtime.block_on(async { time::timeout(SHUTDOWN_TIMEOUT, wait).await })?;
info!("ok, fully shutting down now");
Ok(())
}
}
type CompilerProxyMap<C> = HashMap<PathBuf, (Box<dyn CompilerProxy<C>>, FileTime)>;
type CompilerMap<C> = HashMap<PathBuf, Option<CompilerCacheEntry<C>>>;
struct CompilerCacheEntry<C> {
pub compiler: Box<dyn Compiler<C>>,
pub mtime: FileTime,
pub dist_info: Option<(PathBuf, FileTime)>,
}
impl<C> CompilerCacheEntry<C> {
fn new(
compiler: Box<dyn Compiler<C>>,
mtime: FileTime,
dist_info: Option<(PathBuf, FileTime)>,
) -> Self {
Self {
compiler,
mtime,
dist_info,
}
}
}
#[derive(Clone)]
struct SccacheService<C>
where
C: Send,
{
stats: Arc<Mutex<ServerStats>>,
dist_client: Arc<DistClientContainer>,
storage: Arc<dyn Storage>,
compilers: Arc<RwLock<CompilerMap<C>>>,
compiler_proxies: Arc<RwLock<CompilerProxyMap<C>>>,
rt: tokio::runtime::Handle,
creator: C,
tx: mpsc::Sender<ServerMessage>,
#[allow(dead_code)]
info: ActiveInfo,
}
type SccacheRequest = Message<Request, Body<()>>;
type SccacheResponse = Message<Response, Body<Response>>;
pub enum ServerMessage {
Request,
Shutdown,
}
impl<C> Service<SccacheRequest> for Arc<SccacheService<C>>
where
C: CommandCreatorSync + Send + Sync + 'static,
{
type Response = SccacheResponse;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send + 'static>>;
fn call(&mut self, req: SccacheRequest) -> Self::Future {
trace!("handle_client");
drop(self.tx.clone().start_send(ServerMessage::Request));
let me = self.clone();
Box::pin(async move {
match req.into_inner() {
Request::Compile(compile) => {
debug!("handle_client: compile");
me.stats.lock().await.compile_requests += 1;
me.handle_compile(compile).await
}
Request::GetStats => {
debug!("handle_client: get_stats");
me.get_info()
.await
.map(|i| Response::Stats(Box::new(i)))
.map(Message::WithoutBody)
}
Request::DistStatus => {
debug!("handle_client: dist_status");
me.get_dist_status()
.await
.map(Response::DistStatus)
.map(Message::WithoutBody)
}
Request::ZeroStats => {
debug!("handle_client: zero_stats");
me.zero_stats().await;
Ok(Message::WithoutBody(Response::ZeroStats))
}
Request::Shutdown => {
debug!("handle_client: shutdown");
let mut tx = me.tx.clone();
future::try_join(
async {
let _ = tx.send(ServerMessage::Shutdown).await;
Ok(())
},
me.get_info(),
)
.await
.map(move |(_, info)| {
Message::WithoutBody(Response::ShuttingDown(Box::new(info)))
})
}
}
})
}
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}
use futures::future::Either;
use futures::TryStreamExt;
impl<C> SccacheService<C>
where
C: CommandCreatorSync + Clone + Send + Sync + 'static,
{
pub fn new(
dist_client: DistClientContainer,
storage: Arc<dyn Storage>,
client: &Client,
rt: tokio::runtime::Handle,
tx: mpsc::Sender<ServerMessage>,
info: ActiveInfo,
) -> SccacheService<C> {
SccacheService {
stats: Arc::default(),
dist_client: Arc::new(dist_client),
storage,
compilers: Arc::default(),
compiler_proxies: Arc::default(),
rt,
creator: C::new(client),
tx,
info,
}
}
fn bind<T>(self, socket: T) -> impl Future<Output = Result<()>> + Send + Sized + 'static
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let mut builder = length_delimited::Builder::new();
if let Ok(max_frame_length_str) = env::var("SCCACHE_MAX_FRAME_LENGTH") {
if let Ok(max_frame_length) = max_frame_length_str.parse::<usize>() {
builder.max_frame_length(max_frame_length);
} else {
warn!("Content of SCCACHE_MAX_FRAME_LENGTH is not a valid number, using default");
}
}
let io = builder.new_framed(socket);
let (sink, stream) = SccacheTransport {
inner: Framed::new(io.sink_err_into().err_into(), BincodeCodec),
}
.split();
let sink = sink.sink_err_into::<Error>();
let me = Arc::new(self);
stream
.err_into::<Error>()
.and_then(move |input| me.clone().call(input))
.and_then(move |message| async move {
let fut = match message {
Message::WithoutBody(message) => {
let stream = stream::once(async move { Ok(Frame::Message { message }) });
Either::Left(stream)
}
Message::WithBody(message, body) => {
let stream = stream::once(async move { Ok(Frame::Message { message }) })
.chain(body.map_ok(|chunk| Frame::Body { chunk: Some(chunk) }))
.chain(stream::once(async move { Ok(Frame::Body { chunk: None }) }));
Either::Right(stream)
}
};
Ok(Box::pin(fut))
})
.try_flatten()
.forward(sink)
}
async fn get_dist_status(&self) -> Result<DistInfo> {
Ok(self.dist_client.get_status().await)
}
async fn get_info(&self) -> Result<ServerInfo> {
let stats = self.stats.lock().await.clone();
ServerInfo::new(stats, Some(&*self.storage)).await
}
async fn zero_stats(&self) {
*self.stats.lock().await = ServerStats::default();
}
async fn handle_compile(&self, compile: Compile) -> Result<SccacheResponse> {
let exe = compile.exe;
let cmd = compile.args;
let cwd: PathBuf = compile.cwd.into();
let env_vars = compile.env_vars;
let me = self.clone();
let info = self
.compiler_info(exe.into(), cwd.clone(), &cmd, &env_vars)
.await;
Ok(me.check_compiler(info, cmd, cwd, env_vars).await)
}
async fn compiler_info(
&self,
path: PathBuf,
cwd: PathBuf,
args: &[OsString],
env: &[(OsString, OsString)],
) -> Result<Box<dyn Compiler<C>>> {
trace!("compiler_info");
let me = self.clone();
let me1 = self.clone();
let path2 = path.clone();
let path1 = path.clone();
let env = env.to_vec();
let resolved_with_proxy = {
let compiler_proxies_borrow = self.compiler_proxies.read().await;
let resolve_proxied_executable =
compiler_proxies_borrow
.get(&path)
.map(|(compiler_proxy, _filetime)| {
compiler_proxy.resolve_proxied_executable(
self.creator.clone(),
cwd.clone(),
env.as_slice(),
)
});
match resolve_proxied_executable {
Some(fut) => fut.await.ok(),
None => None,
}
};
let (resolved_compiler_path, mtime) = match resolved_with_proxy {
Some(x) => x, _ => {
metadata(&path2)
.map(|attr| FileTime::from_last_modification_time(&attr))
.ok()
.map(move |filetime| (path2, filetime))
.expect("Must contain sane data, otherwise mtime is not avail")
}
};
let resolved_compiler_path = match resolved_compiler_path.canonicalize() {
Ok(path) if matches!(path.file_name(), Some(name) if resolved_compiler_path.file_name() == Some(name)) => {
path
}
_ => resolved_compiler_path,
};
let dist_info = match me1.dist_client.get_client().await {
Ok(Some(ref client)) => {
if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) {
match metadata(&archive)
.map(|attr| FileTime::from_last_modification_time(&attr))
{
Ok(mtime) => Some((archive, mtime)),
_ => None,
}
} else {
None
}
}
_ => None,
};
let opt = match me1.compilers.read().await.get(&resolved_compiler_path) {
Some(Some(entry)) => {
if entry.mtime == mtime && entry.dist_info == dist_info {
Some(entry.compiler.box_clone())
} else {
None
}
}
_ => None,
};
match opt {
Some(info) => {
trace!("compiler_info cache hit");
Ok(info)
}
None => {
trace!("compiler_info cache miss");
let info = get_compiler_info::<C>(
me.creator.clone(),
&path1,
&cwd,
args,
env.as_slice(),
&me.rt,
dist_info.clone().map(|(p, _)| p),
)
.await;
let (c, proxy) = match info {
Ok((c, proxy)) => (c.clone(), proxy.clone()),
Err(err) => {
trace!("Inserting PLAIN cache map info for {:?}", &path);
me.compilers.write().await.insert(path, None);
return Err(err);
}
};
if let Some(proxy) = proxy {
trace!(
"Inserting new path proxy {:?} @ {:?} -> {:?}",
&path,
&cwd,
resolved_compiler_path
);
me.compiler_proxies
.write()
.await
.insert(path, (proxy, mtime));
}
let map_info = CompilerCacheEntry::new(c.clone(), mtime, dist_info);
trace!(
"Inserting POSSIBLY PROXIED cache map info for {:?}",
&resolved_compiler_path
);
me.compilers
.write()
.await
.insert(resolved_compiler_path, Some(map_info));
Ok(c)
}
}
}
async fn check_compiler(
&self,
compiler: Result<Box<dyn Compiler<C>>>,
cmd: Vec<OsString>,
cwd: PathBuf,
env_vars: Vec<(OsString, OsString)>,
) -> SccacheResponse {
match compiler {
Err(e) => {
debug!("check_compiler: Unsupported compiler: {}", e.to_string());
self.stats.lock().await.requests_unsupported_compiler += 1;
return Message::WithoutBody(Response::Compile(
CompileResponse::UnsupportedCompiler(OsString::from(e.to_string())),
));
}
Ok(c) => {
debug!("check_compiler: Supported compiler");
match c.parse_arguments(&cmd, &cwd, &env_vars) {
CompilerArguments::Ok(hasher) => {
debug!("parse_arguments: Ok: {:?}", cmd);
self.stats.lock().await.requests_executed += 1;
let (tx, rx) = Body::pair();
self.start_compile_task(c, hasher, cmd, cwd, env_vars, tx);
let res = CompileResponse::CompileStarted;
return Message::WithBody(Response::Compile(res), rx);
}
CompilerArguments::CannotCache(why, extra_info) => {
if let Some(extra_info) = extra_info {
debug!(
"parse_arguments: CannotCache({}, {}): {:?}",
why, extra_info, cmd
)
} else {
debug!("parse_arguments: CannotCache({}): {:?}", why, cmd)
}
let mut stats = self.stats.lock().await;
stats.requests_not_cacheable += 1;
*stats.not_cached.entry(why.to_string()).or_insert(0) += 1;
}
CompilerArguments::NotCompilation => {
debug!("parse_arguments: NotCompilation: {:?}", cmd);
self.stats.lock().await.requests_not_compile += 1;
}
}
}
}
let res = CompileResponse::UnhandledCompile;
Message::WithoutBody(Response::Compile(res))
}
fn start_compile_task(
&self,
compiler: Box<dyn Compiler<C>>,
hasher: Box<dyn CompilerHasher<C>>,
arguments: Vec<OsString>,
cwd: PathBuf,
env_vars: Vec<(OsString, OsString)>,
mut tx: mpsc::Sender<Result<Response>>,
) {
let force_recache = env_vars
.iter()
.any(|(k, _v)| k.as_os_str() == OsStr::new("SCCACHE_RECACHE"));
let cache_control = if force_recache {
CacheControl::ForceRecache
} else {
CacheControl::Default
};
let out_pretty = hasher.output_pretty().into_owned();
let color_mode = hasher.color_mode();
let me = self.clone();
let kind = compiler.kind();
let lang = hasher.language();
let creator = self.creator.clone();
let storage = self.storage.clone();
let pool = self.rt.clone();
let task = async move {
let dist_client = me.dist_client.get_client().await;
let result = match dist_client {
Ok(client) => std::panic::AssertUnwindSafe(hasher.get_cached_or_compile(
client,
creator,
storage,
arguments,
cwd,
env_vars,
cache_control,
pool,
))
.catch_unwind()
.await
.map_err(|e| {
let panic = e
.downcast_ref::<&str>()
.map(|s| &**s)
.or_else(|| e.downcast_ref::<String>().map(|s| &**s))
.unwrap_or("An unknown panic was caught.");
let thread = std::thread::current();
let thread_name = thread.name().unwrap_or("unnamed");
if let Some((file, line, column)) = PANIC_LOCATION.with(|l| l.take()) {
anyhow!(
"thread '{thread_name}' panicked at {file}:{line}:{column}: {panic}"
)
} else {
anyhow!("thread '{thread_name}' panicked: {panic}")
}
})
.and_then(std::convert::identity),
Err(e) => Err(e),
};
let mut cache_write = None;
let mut res = CompileFinished {
color_mode,
..Default::default()
};
match result {
Ok((compiled, out)) => {
let mut stats = me.stats.lock().await;
match compiled {
CompileResult::Error => {
debug!("compile result: cache error");
stats.cache_errors.increment(&kind, &lang);
}
CompileResult::CacheHit(duration) => {
debug!("compile result: cache hit");
stats.cache_hits.increment(&kind, &lang);
stats.cache_read_hit_duration += duration;
}
CompileResult::CacheMiss(miss_type, dist_type, duration, future) => {
debug!("compile result: cache miss");
match dist_type {
DistType::NoDist => {}
DistType::Ok(id) => {
let server = id.addr().to_string();
let server_count =
stats.dist_compiles.entry(server).or_insert(0);
*server_count += 1;
}
DistType::Error => stats.dist_errors += 1,
}
match miss_type {
MissType::Normal => {}
MissType::ForcedRecache => {
stats.forced_recaches += 1;
}
MissType::TimedOut => {
stats.cache_timeouts += 1;
}
MissType::CacheReadError => {
stats.cache_errors.increment(&kind, &lang);
}
}
stats.cache_misses.increment(&kind, &lang);
stats.compiler_write_duration += duration;
debug!("stats after compile result: {stats:?}");
cache_write = Some(future);
}
CompileResult::NotCacheable => {
debug!("compile result: not cacheable");
stats.cache_misses.increment(&kind, &lang);
stats.non_cacheable_compilations += 1;
}
CompileResult::CompileFailed => {
debug!("compile result: compile failed");
stats.compile_fails += 1;
}
};
drop(stats);
let Output {
status,
stdout,
stderr,
} = out;
trace!("CompileFinished retcode: {}", status);
match status.code() {
Some(code) => res.retcode = Some(code),
None => res.signal = Some(get_signal(status)),
};
res.stdout = stdout;
res.stderr = stderr;
}
Err(err) => {
let mut stats = me.stats.lock().await;
match err.downcast::<ProcessError>() {
Ok(ProcessError(output)) => {
debug!("Compilation failed: {:?}", output);
stats.compile_fails += 1;
match output.status.code() {
Some(code) => res.retcode = Some(code),
None => res.signal = Some(get_signal(output.status)),
};
res.stdout = output.stdout;
res.stderr = output.stderr;
}
Err(err) => match err.downcast::<HttpClientError>() {
Ok(HttpClientError(msg)) => {
me.dist_client.reset_state().await;
let errmsg =
format!("[{:?}] http error status: {}", out_pretty, msg);
error!("{}", errmsg);
res.retcode = Some(1);
res.stderr = errmsg.as_bytes().to_vec();
}
Err(err) => {
use std::fmt::Write;
error!("[{:?}] fatal error: {}", out_pretty, err);
let mut error = "sccache: encountered fatal error\n".to_string();
let _ = writeln!(error, "sccache: error: {}", err);
for e in err.chain() {
error!("[{:?}] \t{}", out_pretty, e);
let _ = writeln!(error, "sccache: caused by: {}", e);
}
stats.cache_errors.increment(&kind, &lang);
res.retcode = Some(-2);
res.stderr = error.into_bytes();
}
},
}
}
};
let send = tx
.send(Ok(Response::CompileFinished(res)))
.map_err(|e| anyhow!("send on finish failed").context(e));
let me = me.clone();
let cache_write = async move {
if let Some(cache_write) = cache_write {
match cache_write.await {
Err(e) => {
debug!("Error executing cache write: {}", e);
me.stats.lock().await.cache_write_errors += 1;
}
Ok(info) => {
debug!(
"[{}]: Cache write finished in {}",
info.object_file_pretty,
util::fmt_duration_as_secs(&info.duration)
);
let mut stats = me.stats.lock().await;
stats.cache_writes += 1;
stats.cache_write_duration += info.duration;
}
}
}
Ok(())
};
futures::future::try_join(send, cache_write).await?;
Ok::<_, Error>(())
};
self.rt.spawn(async move {
task.await
.unwrap_or_else(|e| warn!("Failed to execute task: {:?}", e));
});
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct PerLanguageCount {
counts: HashMap<String, u64>,
adv_counts: HashMap<String, u64>,
}
impl PerLanguageCount {
fn increment(&mut self, kind: &CompilerKind, lang: &Language) {
let lang_comp_key = kind.lang_comp_kind(lang);
let adv_count = self.adv_counts.entry(lang_comp_key).or_insert(0);
*adv_count += 1;
let lang_key = kind.lang_kind(lang);
let count = self.counts.entry(lang_key).or_insert(0);
*count += 1;
}
pub fn all(&self) -> u64 {
self.counts.values().sum()
}
pub fn get(&self, key: &str) -> Option<&u64> {
self.counts.get(key)
}
pub fn get_adv(&self, key: &str) -> Option<&u64> {
self.adv_counts.get(key)
}
pub fn new() -> PerLanguageCount {
Self::default()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ServerStats {
pub compile_requests: u64,
pub requests_unsupported_compiler: u64,
pub requests_not_compile: u64,
pub requests_not_cacheable: u64,
pub requests_executed: u64,
pub cache_errors: PerLanguageCount,
pub cache_hits: PerLanguageCount,
pub cache_misses: PerLanguageCount,
pub cache_timeouts: u64,
pub cache_read_errors: u64,
pub non_cacheable_compilations: u64,
pub forced_recaches: u64,
pub cache_write_errors: u64,
pub cache_writes: u64,
pub cache_write_duration: Duration,
pub cache_read_hit_duration: Duration,
pub compiler_write_duration: Duration,
pub compile_fails: u64,
pub not_cached: HashMap<String, usize>,
pub dist_compiles: HashMap<String, usize>,
pub dist_errors: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ServerInfo {
pub stats: ServerStats,
pub cache_location: String,
pub cache_size: Option<u64>,
pub max_cache_size: Option<u64>,
pub use_preprocessor_cache_mode: bool,
pub version: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum DistInfo {
Disabled(String),
#[cfg(feature = "dist-client")]
NotConnected(Option<config::HTTPUrl>, String),
#[cfg(feature = "dist-client")]
SchedulerStatus(Option<config::HTTPUrl>, dist::SchedulerStatusResult),
}
impl Default for ServerStats {
fn default() -> ServerStats {
ServerStats {
compile_requests: u64::default(),
requests_unsupported_compiler: u64::default(),
requests_not_compile: u64::default(),
requests_not_cacheable: u64::default(),
requests_executed: u64::default(),
cache_errors: PerLanguageCount::new(),
cache_hits: PerLanguageCount::new(),
cache_misses: PerLanguageCount::new(),
cache_timeouts: u64::default(),
cache_read_errors: u64::default(),
non_cacheable_compilations: u64::default(),
forced_recaches: u64::default(),
cache_write_errors: u64::default(),
cache_writes: u64::default(),
cache_write_duration: Duration::new(0, 0),
cache_read_hit_duration: Duration::new(0, 0),
compiler_write_duration: Duration::new(0, 0),
compile_fails: u64::default(),
not_cached: HashMap::new(),
dist_compiles: HashMap::new(),
dist_errors: u64::default(),
}
}
}
impl ServerStats {
fn print(&self, advanced: bool) -> (usize, usize) {
macro_rules! set_stat {
($vec:ident, $var:expr, $name:expr) => {{
$vec.push(($name.to_string(), $var.to_string(), 0));
}};
}
macro_rules! set_lang_stat {
($vec:ident, $var:expr, $name:expr) => {{
$vec.push(($name.to_string(), $var.all().to_string(), 0));
let mut sorted_stats: Vec<_> = $var.counts.iter().collect();
sorted_stats.sort_by_key(|v| v.0);
for (lang, count) in sorted_stats.iter() {
$vec.push((format!("{} ({})", $name, lang), count.to_string(), 0));
}
}};
}
macro_rules! set_compiler_stat {
($vec:ident, $var:expr, $name:expr) => {{
$vec.push(($name.to_string(), $var.all().to_string(), 0));
let mut sorted_stats: Vec<_> = $var.adv_counts.iter().collect();
sorted_stats.sort_by_key(|v| v.0);
for (lang, count) in sorted_stats.iter() {
$vec.push((format!("{} ({})", $name, lang), count.to_string(), 0));
}
}};
}
macro_rules! set_duration_stat {
($vec:ident, $dur:expr, $num:expr, $name:expr) => {{
let s = if $num > 0 {
$dur / $num as u32
} else {
Default::default()
};
$vec.push(($name.to_string(), util::fmt_duration_as_secs(&s), 2));
}};
}
let mut stats_vec = vec![];
set_stat!(stats_vec, self.compile_requests, "Compile requests");
set_stat!(
stats_vec,
self.requests_executed,
"Compile requests executed"
);
if advanced {
set_compiler_stat!(stats_vec, self.cache_hits, "Cache hits");
set_compiler_stat!(stats_vec, self.cache_misses, "Cache misses");
} else {
set_lang_stat!(stats_vec, self.cache_hits, "Cache hits");
set_lang_stat!(stats_vec, self.cache_misses, "Cache misses");
}
set_stat!(stats_vec, self.cache_timeouts, "Cache timeouts");
set_stat!(stats_vec, self.cache_read_errors, "Cache read errors");
set_stat!(stats_vec, self.forced_recaches, "Forced recaches");
set_stat!(stats_vec, self.cache_write_errors, "Cache write errors");
set_stat!(stats_vec, self.compile_fails, "Compilation failures");
if advanced {
set_compiler_stat!(stats_vec, self.cache_errors, "Cache errors");
} else {
set_lang_stat!(stats_vec, self.cache_errors, "Cache errors");
}
set_stat!(
stats_vec,
self.non_cacheable_compilations,
"Non-cacheable compilations"
);
set_stat!(
stats_vec,
self.requests_not_cacheable,
"Non-cacheable calls"
);
set_stat!(
stats_vec,
self.requests_not_compile,
"Non-compilation calls"
);
set_stat!(
stats_vec,
self.requests_unsupported_compiler,
"Unsupported compiler calls"
);
set_duration_stat!(
stats_vec,
self.cache_write_duration,
self.cache_writes,
"Average cache write"
);
set_duration_stat!(
stats_vec,
self.compiler_write_duration,
self.cache_misses.all(),
"Average compiler"
);
set_duration_stat!(
stats_vec,
self.cache_read_hit_duration,
self.cache_hits.all(),
"Average cache read hit"
);
set_stat!(
stats_vec,
self.dist_errors,
"Failed distributed compilations"
);
let name_width = stats_vec.iter().map(|(n, _, _)| n.len()).max().unwrap();
let stat_width = stats_vec.iter().map(|(_, s, _)| s.len()).max().unwrap();
for (name, stat, suffix_len) in stats_vec {
println!(
"{:<name_width$} {:>stat_width$}",
name,
stat,
name_width = name_width,
stat_width = stat_width + suffix_len
);
}
if !self.dist_compiles.is_empty() {
println!("\nSuccessful distributed compiles");
let mut counts: Vec<_> = self.dist_compiles.iter().collect();
counts.sort_by(|(_, c1), (_, c2)| c1.cmp(c2).reverse());
for (reason, count) in counts {
println!(
" {:<name_width$} {:>stat_width$}",
reason,
count,
name_width = name_width - 2,
stat_width = stat_width
);
}
}
if !self.not_cached.is_empty() {
println!("\nNon-cacheable reasons:");
let mut counts: Vec<_> = self.not_cached.iter().collect();
counts.sort_by(|(_, c1), (_, c2)| c1.cmp(c2).reverse());
for (reason, count) in counts {
println!(
"{:<name_width$} {:>stat_width$}",
reason,
count,
name_width = name_width,
stat_width = stat_width
);
}
println!();
}
(name_width, stat_width)
}
}
impl ServerInfo {
pub async fn new(stats: ServerStats, storage: Option<&dyn Storage>) -> Result<Self> {
let cache_location;
let use_preprocessor_cache_mode;
let cache_size;
let max_cache_size;
if let Some(storage) = storage {
cache_location = storage.location();
use_preprocessor_cache_mode = storage
.preprocessor_cache_mode_config()
.use_preprocessor_cache_mode;
(cache_size, max_cache_size) =
futures::try_join!(storage.current_size(), storage.max_size())?;
} else {
cache_location = String::new();
use_preprocessor_cache_mode = false;
cache_size = None;
max_cache_size = None;
}
let version = env!("CARGO_PKG_VERSION").to_string();
Ok(ServerInfo {
stats,
cache_location,
cache_size,
max_cache_size,
use_preprocessor_cache_mode,
version,
})
}
pub fn print(&self, advanced: bool) {
let (name_width, stat_width) = self.stats.print(advanced);
println!(
"{:<name_width$} {}",
"Cache location",
self.cache_location,
name_width = name_width
);
if self.cache_location.starts_with("Local disk") {
println!(
"{:<name_width$} {}",
"Use direct/preprocessor mode?",
if self.use_preprocessor_cache_mode {
"yes"
} else {
"no"
},
name_width = name_width
);
}
println!(
"{:<name_width$} {}",
"Version (client)",
self.version,
name_width = name_width
);
for &(name, val) in &[
("Cache size", &self.cache_size),
("Max cache size", &self.max_cache_size),
] {
if let Some(val) = *val {
let (val, suffix) = match NumberPrefix::binary(val as f64) {
NumberPrefix::Standalone(bytes) => (bytes.to_string(), "bytes".to_string()),
NumberPrefix::Prefixed(prefix, n) => {
(format!("{:.0}", n), format!("{}B", prefix))
}
};
println!(
"{:<name_width$} {:>stat_width$} {}",
name,
val,
suffix,
name_width = name_width,
stat_width = stat_width
);
}
}
}
}
enum Frame<R, R1> {
Body { chunk: Option<R1> },
Message { message: R },
}
struct Body<R> {
receiver: mpsc::Receiver<Result<R>>,
}
impl<R> Body<R> {
fn pair() -> (mpsc::Sender<Result<R>>, Self) {
let (tx, rx) = mpsc::channel(0);
(tx, Body { receiver: rx })
}
}
impl<R> futures::Stream for Body<R> {
type Item = Result<R>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_next(cx)
}
}
enum Message<R, B> {
WithBody(R, B),
WithoutBody(R),
}
impl<R, B> Message<R, B> {
fn into_inner(self) -> R {
match self {
Message::WithBody(r, _) => r,
Message::WithoutBody(r) => r,
}
}
}
struct BincodeCodec;
impl<T> tokio_serde::Serializer<T> for BincodeCodec
where
T: serde::Serialize,
{
type Error = Error;
fn serialize(self: Pin<&mut Self>, item: &T) -> std::result::Result<Bytes, Self::Error> {
let mut bytes = BytesMut::new();
bincode::serialize_into((&mut bytes).writer(), item)?;
Ok(bytes.freeze())
}
}
impl<T> tokio_serde::Deserializer<T> for BincodeCodec
where
T: serde::de::DeserializeOwned,
{
type Error = Error;
fn deserialize(self: Pin<&mut Self>, buf: &BytesMut) -> std::result::Result<T, Self::Error> {
let ret = bincode::deserialize(buf)?;
Ok(ret)
}
}
struct SccacheTransport<I: AsyncRead + AsyncWrite + Unpin> {
inner: Framed<
futures::stream::ErrInto<
futures::sink::SinkErrInto<
tokio_util::codec::Framed<I, LengthDelimitedCodec>,
Bytes,
Error,
>,
Error,
>,
Request,
Response,
BincodeCodec,
>,
}
impl<I: AsyncRead + AsyncWrite + Unpin> Stream for SccacheTransport<I> {
type Item = Result<Message<Request, Body<()>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner)
.poll_next(cx)
.map(|r| r.map(|s| s.map(Message::WithoutBody)))
}
}
impl<I: AsyncRead + AsyncWrite + Unpin> Sink<Frame<Response, Response>> for SccacheTransport<I> {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: Frame<Response, Response>) -> Result<()> {
match item {
Frame::Message { message } => Pin::new(&mut self.inner).start_send(message),
Frame::Body { chunk: Some(chunk) } => Pin::new(&mut self.inner).start_send(chunk),
Frame::Body { chunk: None } => Ok(()),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}
struct ShutdownOrInactive {
rx: mpsc::Receiver<ServerMessage>,
timeout: Option<Pin<Box<Sleep>>>,
timeout_dur: Duration,
}
impl Future for ShutdownOrInactive {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Pending => break,
Poll::Ready(Some(ServerMessage::Shutdown)) => return Poll::Ready(()),
Poll::Ready(Some(ServerMessage::Request)) => {
if self.timeout_dur != Duration::new(0, 0) {
self.timeout = Some(Box::pin(sleep(self.timeout_dur)));
}
}
Poll::Ready(None) => return Poll::Ready(()),
}
}
match self.timeout {
None => Poll::Pending,
Some(ref mut timeout) => timeout.as_mut().poll(cx),
}
}
}
struct WaitUntilZero {
info: std::sync::Weak<std::sync::Mutex<Info>>,
}
#[derive(Clone)]
#[allow(dead_code)]
struct ActiveInfo {
info: Arc<std::sync::Mutex<Info>>,
}
struct Info {
waker: Option<Waker>,
}
impl Drop for Info {
fn drop(&mut self) {
if let Some(waker) = self.waker.as_ref() {
waker.wake_by_ref();
}
}
}
impl WaitUntilZero {
#[rustfmt::skip]
fn new() -> (WaitUntilZero, ActiveInfo) {
let info = Arc::new(std::sync::Mutex::new(Info { waker: None }));
(WaitUntilZero { info: Arc::downgrade(&info) }, ActiveInfo { info })
}
}
impl std::future::Future for WaitUntilZero {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
match self.info.upgrade() {
None => std::task::Poll::Ready(()),
Some(arc) => {
let mut info = arc.lock().expect("we can't panic when holding lock");
info.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
}
}
#[test]
fn waits_until_zero() {
let (wait, _active) = WaitUntilZero::new();
assert_eq!(wait.now_or_never(), None);
let (wait, active) = WaitUntilZero::new();
let _active2 = active.clone();
drop(active);
assert_eq!(wait.now_or_never(), None);
let (wait, _) = WaitUntilZero::new();
assert_eq!(wait.now_or_never(), Some(()));
let (wait, active) = WaitUntilZero::new();
let active2 = active.clone();
drop(active);
drop(active2);
assert_eq!(wait.now_or_never(), Some(()));
}