use super::cache_io::*;
#[cfg(feature = "azure")]
use crate::cache::azure::AzureBlobCache;
#[cfg(feature = "cos")]
use crate::cache::cos::COSCache;
use crate::cache::disk::DiskCache;
#[cfg(feature = "gcs")]
use crate::cache::gcs::GCSCache;
#[cfg(feature = "gha")]
use crate::cache::gha::GHACache;
#[cfg(feature = "memcached")]
use crate::cache::memcached::MemcachedCache;
use crate::cache::multilevel::{MultiLevelStats, MultiLevelStorage};
#[cfg(feature = "oss")]
use crate::cache::oss::OSSCache;
#[cfg(feature = "redis")]
use crate::cache::redis::RedisCache;
#[cfg(feature = "s3")]
use crate::cache::s3::S3Cache;
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
feature = "oss",
feature = "cos"
))]
use crate::cache::utils::normalize_key;
#[cfg(feature = "webdav")]
use crate::cache::webdav::WebdavCache;
use crate::compiler::PreprocessorCacheEntry;
use crate::config::Config;
use crate::config::{self, CacheType, PreprocessorCacheModeConfig};
use async_trait::async_trait;
use bytes::Bytes;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use crate::errors::*;
#[async_trait]
pub trait Storage: Send + Sync {
async fn get(&self, key: &str) -> Result<Cache>;
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration>;
async fn get_raw(&self, _key: &str) -> Result<Option<Bytes>> {
Ok(None)
}
async fn put_raw(&self, _key: &str, _data: Bytes) -> Result<Duration> {
Err(anyhow!("put_raw not implemented for this storage backend"))
}
async fn check(&self) -> Result<CacheMode> {
Ok(CacheMode::ReadWrite)
}
fn location(&self) -> String;
fn cache_type_name(&self) -> &'static str {
"unknown"
}
async fn current_size(&self) -> Result<Option<u64>>;
async fn max_size(&self) -> Result<Option<u64>>;
fn multilevel_stats(&self) -> Option<MultiLevelStats> {
None
}
fn preprocessor_cache_mode_config(&self) -> PreprocessorCacheModeConfig {
PreprocessorCacheModeConfig::default()
}
fn basedirs(&self) -> &[Vec<u8>] {
&[]
}
async fn get_preprocessor_cache_entry(
&self,
_key: &str,
) -> Result<Option<Box<dyn crate::lru_disk_cache::ReadSeek>>> {
Ok(None)
}
async fn put_preprocessor_cache_entry(
&self,
_key: &str,
_preprocessor_cache_entry: PreprocessorCacheEntry,
) -> Result<()> {
Ok(())
}
}
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
feature = "oss",
feature = "cos"
))]
pub struct RemoteStorage {
operator: opendal::Operator,
basedirs: Vec<Vec<u8>>,
}
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
feature = "oss",
feature = "cos"
))]
impl RemoteStorage {
pub fn new(operator: opendal::Operator, basedirs: Vec<Vec<u8>>) -> Self {
Self { operator, basedirs }
}
}
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
feature = "oss",
feature = "cos"
))]
#[async_trait]
impl Storage for RemoteStorage {
async fn get(&self, key: &str) -> Result<Cache> {
match self.operator.read(&normalize_key(key)).await {
Ok(res) => {
let hit = CacheRead::from(io::Cursor::new(res.to_bytes()))?;
Ok(Cache::Hit(hit))
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(Cache::Miss),
Err(e) => {
warn!("Got unexpected error: {:?}", e);
Ok(Cache::Miss)
}
}
}
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
trace!("RemoteStorage::put({})", key);
let data = entry.finish()?;
self.put_raw(key, data.into()).await
}
async fn check(&self) -> Result<CacheMode> {
use opendal::ErrorKind;
let path = ".sccache_check";
match self.operator.read(path).await {
Ok(_) => (),
Err(err) if err.kind() == ErrorKind::NotFound => (),
Err(err) if err.kind() == ErrorKind::RateLimited => {
eprintln!("cache storage read check: {err:?}, but we decide to keep running");
}
Err(err) => bail!("cache storage failed to read: {:?}", err),
}
let can_write = match self.operator.write(path, "Hello, World!").await {
Ok(_) => true,
Err(err) if err.kind() == ErrorKind::AlreadyExists => true,
Err(err) => {
eprintln!("storage write check failed: {err:?}");
false
}
};
let mode = if can_write {
CacheMode::ReadWrite
} else {
CacheMode::ReadOnly
};
debug!("storage check result: {mode:?}");
Ok(mode)
}
fn location(&self) -> String {
let meta = self.operator.info();
format!(
"{}, name: {}, prefix: {}",
meta.scheme(),
meta.name(),
meta.root()
)
}
fn cache_type_name(&self) -> &'static str {
self.operator.info().scheme()
}
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
fn basedirs(&self) -> &[Vec<u8>] {
&self.basedirs
}
async fn get_raw(&self, key: &str) -> Result<Option<Bytes>> {
trace!("opendal::Operator::get_raw({})", key);
match self.operator.read(&normalize_key(key)).await {
Ok(res) => {
let data = res.to_bytes();
trace!(
"opendal::Operator::get_raw({}): Found {} bytes",
key,
data.len()
);
Ok(Some(data))
}
Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
trace!("opendal::Operator::get_raw({}): NotFound", key);
Ok(None)
}
Err(e) => {
warn!("opendal::Operator::get_raw({}): Error: {:?}", key, e);
Err(anyhow!("Failed to read raw bytes: {:?}", e))
}
}
}
async fn put_raw(&self, key: &str, data: Bytes) -> Result<Duration> {
trace!("opendal::Operator::put_raw({}, {} bytes)", key, data.len());
let start = std::time::Instant::now();
self.operator.write(&normalize_key(key), data).await?;
Ok(start.elapsed())
}
}
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
feature = "oss",
feature = "cos"
))]
pub fn build_single_cache(
cache_type: &CacheType,
basedirs: &[Vec<u8>],
_pool: &tokio::runtime::Handle,
) -> Result<Arc<dyn Storage>> {
match cache_type {
#[cfg(feature = "azure")]
CacheType::Azure(config::AzureCacheConfig {
connection_string,
container,
key_prefix,
}) => {
debug!("Init azure cache with container {container}, key_prefix {key_prefix}");
let operator = AzureBlobCache::build(connection_string, container, key_prefix)
.map_err(|err| anyhow!("create azure cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "gcs")]
CacheType::GCS(config::GCSCacheConfig {
bucket,
key_prefix,
cred_path,
rw_mode,
service_account,
credential_url,
}) => {
debug!("Init gcs cache with bucket {bucket}, key_prefix {key_prefix}");
let operator = GCSCache::build(
bucket,
key_prefix,
cred_path.as_deref(),
service_account.as_deref(),
(*rw_mode).into(),
credential_url.as_deref(),
)
.map_err(|err| anyhow!("create gcs cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "gha")]
CacheType::GHA(config::GHACacheConfig { version, .. }) => {
debug!("Init gha cache with version {version}");
let operator = GHACache::build(version)
.map_err(|err| anyhow!("create gha cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "memcached")]
CacheType::Memcached(config::MemcachedCacheConfig {
url,
username,
password,
expiration,
key_prefix,
}) => {
debug!("Init memcached cache with url {url}");
let operator = MemcachedCache::build(
url,
username.as_deref(),
password.as_deref(),
key_prefix,
*expiration,
)
.map_err(|err| anyhow!("create memcached cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "redis")]
CacheType::Redis(config::RedisCacheConfig {
endpoint,
cluster_endpoints,
username,
password,
db,
url,
ttl,
key_prefix,
}) => {
let storage = match (endpoint, cluster_endpoints, url) {
(Some(url), None, None) => {
debug!("Init redis single-node cache with url {url}");
RedisCache::build_single(
url,
username.as_deref(),
password.as_deref(),
*db,
key_prefix,
*ttl,
)
}
(None, Some(urls), None) => {
debug!("Init redis cluster cache with urls {urls}");
RedisCache::build_cluster(
urls,
username.as_deref(),
password.as_deref(),
*db,
key_prefix,
*ttl,
)
}
(None, None, Some(url)) => {
warn!("Init redis single-node cache from deprecated API with url {url}");
if username.is_some() || password.is_some() || *db != crate::config::DEFAULT_REDIS_DB {
bail!("`username`, `password` and `db` has no effect when `url` is set. Please use `endpoint` or `cluster_endpoints` for new API accessing");
}
RedisCache::build_from_url(url, key_prefix, *ttl)
}
_ => bail!("Only one of `endpoint`, `cluster_endpoints`, `url` must be set"),
}
.map_err(|err| anyhow!("create redis cache failed: {err:?}"))?;
let storage = RemoteStorage::new(storage, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "s3")]
CacheType::S3(c) => {
debug!(
"Init s3 cache with bucket {}, endpoint {:?}",
c.bucket, c.endpoint
);
let storage_builder =
S3Cache::new(c.bucket.clone(), c.key_prefix.clone(), c.no_credentials);
let operator = storage_builder
.with_region(c.region.clone())
.with_endpoint(c.endpoint.clone())
.with_use_ssl(c.use_ssl)
.with_server_side_encryption(c.server_side_encryption)
.with_enable_virtual_host_style(c.enable_virtual_host_style)
.build()
.map_err(|err| anyhow!("create s3 cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "webdav")]
CacheType::Webdav(c) => {
debug!("Init webdav cache with endpoint {}", c.endpoint);
let operator = WebdavCache::build(
&c.endpoint,
&c.key_prefix,
c.username.as_deref(),
c.password.as_deref(),
c.token.as_deref(),
)
.map_err(|err| anyhow!("create webdav cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "oss")]
CacheType::OSS(c) => {
debug!(
"Init oss cache with bucket {}, endpoint {:?}",
c.bucket, c.endpoint
);
let operator = OSSCache::build(
&c.bucket,
&c.key_prefix,
c.endpoint.as_deref(),
c.no_credentials,
)
.map_err(|err| anyhow!("create oss cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[cfg(feature = "cos")]
CacheType::COS(c) => {
debug!(
"Init cos cache with bucket {}, endpoint {:?}",
c.bucket, c.endpoint
);
let operator = COSCache::build(&c.bucket, &c.key_prefix, c.endpoint.as_deref())
.map_err(|err| anyhow!("create cos cache failed: {err:?}"))?;
let storage = RemoteStorage::new(operator, basedirs.to_vec());
Ok(Arc::new(storage))
}
#[allow(unreachable_patterns)]
_ => {
bail!("Cache type not supported with current feature configuration")
}
}
}
pub fn storage_from_config(
config: &Config,
pool: &tokio::runtime::Handle,
) -> Result<Arc<dyn Storage>> {
if let Some(multilevel) = MultiLevelStorage::from_config(config, pool)? {
return Ok(Arc::new(multilevel));
}
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
feature = "oss",
feature = "cos"
))]
if let Some(cache_type) = &config.cache {
debug!("Configuring single cache from CacheType");
return build_single_cache(cache_type, &config.basedirs, pool);
}
let (dir, size) = (&config.fallback_cache.dir, config.fallback_cache.size);
let preprocessor_cache_mode_config = config.fallback_cache.preprocessor_cache_mode;
let rw_mode = config.fallback_cache.rw_mode.into();
debug!("Init disk cache with dir {:?}, size {}", dir, size);
Ok(Arc::new(DiskCache::new(
dir,
size,
pool,
preprocessor_cache_mode_config,
rw_mode,
config.basedirs.clone(),
)))
}
#[cfg(test)]
mod test {
use super::*;
use crate::config::CacheModeConfig;
use fs_err as fs;
#[test]
fn test_read_write_mode_local() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let mut config = Config {
cache: None,
..Default::default()
};
let tempdir = tempfile::Builder::new()
.prefix("sccache_test_rust_cargo")
.tempdir()
.context("Failed to create tempdir")
.unwrap();
let cache_dir = tempdir.path().join("cache");
fs::create_dir(&cache_dir).unwrap();
config.fallback_cache.dir = cache_dir;
config.fallback_cache.rw_mode = CacheModeConfig::ReadWrite;
{
let cache = storage_from_config(&config, runtime.handle()).unwrap();
runtime.block_on(async move {
cache.put("test1", CacheWrite::default()).await.unwrap();
cache
.put_preprocessor_cache_entry("test1", PreprocessorCacheEntry::default())
.await
.unwrap();
});
}
config.fallback_cache.rw_mode = CacheModeConfig::ReadOnly;
{
let cache = storage_from_config(&config, runtime.handle()).unwrap();
runtime.block_on(async move {
assert_eq!(
cache
.put("test1", CacheWrite::default())
.await
.unwrap_err()
.to_string(),
"Cannot write to a read-only cache"
);
assert_eq!(
cache
.put_preprocessor_cache_entry("test1", PreprocessorCacheEntry::default())
.await
.unwrap_err()
.to_string(),
"Cannot write to a read-only cache"
);
});
}
}
#[test]
#[cfg(feature = "s3")]
fn test_operator_storage_s3_with_basedirs() {
let operator = crate::cache::s3::S3Cache::new(
"test-bucket".to_string(),
"test-prefix".to_string(),
true, )
.with_region(Some("us-east-1".to_string()))
.build()
.expect("Failed to create S3 cache operator");
let basedirs = vec![b"/home/user/project".to_vec(), b"/opt/build".to_vec()];
let storage = RemoteStorage::new(operator, basedirs.clone());
assert_eq!(storage.basedirs(), basedirs.as_slice());
assert_eq!(storage.basedirs().len(), 2);
assert_eq!(storage.basedirs()[0], b"/home/user/project".to_vec());
assert_eq!(storage.basedirs()[1], b"/opt/build".to_vec());
}
#[test]
#[cfg(feature = "redis")]
fn test_operator_storage_redis_with_basedirs() {
let operator = crate::cache::redis::RedisCache::build_single(
"redis://localhost:6379",
None,
None,
0,
"test-prefix",
0,
)
.expect("Failed to create Redis cache operator");
let basedirs = vec![b"/workspace".to_vec()];
let storage = RemoteStorage::new(operator, basedirs.clone());
assert_eq!(storage.basedirs(), basedirs.as_slice());
assert_eq!(storage.basedirs().len(), 1);
}
}