use std::path::Path;
use sqlx::FromRow;
use uuid::Uuid;
use common::linked_data::Hash;
use common::prelude::Mime;
use crate::database::types::{DMime, DUuid, DbHash};
use crate::database::Database;
#[derive(Debug, Clone)]
pub struct GatewayCacheEntry {
pub link: Hash,
pub mime_type: Mime,
}
#[derive(FromRow)]
struct Row {
link: DbHash,
mime_type: DMime,
}
impl From<Row> for GatewayCacheEntry {
fn from(row: Row) -> Self {
Self {
link: row.link.into(),
mime_type: row.mime_type.into(),
}
}
}
impl GatewayCacheEntry {
pub async fn lookup(
bucket_id: &Uuid,
height: u64,
path: &Path,
query_string: Option<&str>,
db: &Database,
) -> Result<Option<Self>, sqlx::Error> {
let bid = DUuid::from(*bucket_id);
let path_str = path.to_string_lossy();
let qs = query_string.unwrap_or("");
let row = sqlx::query_as::<_, Row>(
"SELECT link, mime_type FROM gateway_cache
WHERE bucket_id = ? AND height = ? AND path = ? AND query_string = ?",
)
.bind(bid)
.bind(height as i64)
.bind(path_str.as_ref())
.bind(qs)
.fetch_optional(&**db)
.await?;
if row.is_some() {
let _ = sqlx::query(
"UPDATE gateway_cache SET last_accessed = unixepoch()
WHERE bucket_id = ? AND height = ? AND path = ? AND query_string = ?",
)
.bind(bid)
.bind(height as i64)
.bind(path_str.as_ref())
.bind(qs)
.execute(&**db)
.await;
}
Ok(row.map(Into::into))
}
#[allow(clippy::too_many_arguments)]
pub async fn log(
bucket_id: &Uuid,
height: u64,
path: &Path,
query_string: Option<&str>,
link: &Hash,
content_size: u64,
mime_type: &Mime,
db: &Database,
) -> Result<(), sqlx::Error> {
let bid = DUuid::from(*bucket_id);
let path_str = path.to_string_lossy();
let qs = query_string.unwrap_or("");
let dlink = DbHash::from(*link);
let dmime = DMime::from(mime_type.clone());
sqlx::query(
"INSERT OR IGNORE INTO gateway_cache
(bucket_id, height, path, query_string, link, content_size, mime_type,
created_at, last_accessed)
VALUES (?, ?, ?, ?, ?, ?, ?, unixepoch(), unixepoch())",
)
.bind(bid)
.bind(height as i64)
.bind(path_str.as_ref())
.bind(qs)
.bind(dlink)
.bind(content_size as i64)
.bind(dmime)
.execute(&**db)
.await?;
Ok(())
}
pub async fn evict_old_heights(keep_versions: u32, db: &Database) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
"DELETE FROM gateway_cache
WHERE rowid IN (
SELECT gc.rowid FROM gateway_cache gc
WHERE gc.height < (
SELECT COALESCE(MIN(h), 0) FROM (
SELECT DISTINCT height AS h FROM gateway_cache gc2
WHERE gc2.bucket_id = gc.bucket_id
ORDER BY height DESC
LIMIT ?
)
)
)",
)
.bind(keep_versions)
.execute(&**db)
.await?;
Ok(result.rows_affected())
}
pub async fn evict_lru(max_total_size: u64, db: &Database) -> Result<Vec<String>, sqlx::Error> {
let total: i64 =
sqlx::query_scalar("SELECT COALESCE(SUM(content_size), 0) FROM gateway_cache")
.fetch_one(&**db)
.await?;
if total <= max_total_size as i64 {
return Ok(Vec::new());
}
let to_free = total - max_total_size as i64;
let mut freed: i64 = 0;
let mut removed_links = Vec::new();
let rows: Vec<(i64, String)> =
sqlx::query_as("SELECT rowid, link FROM gateway_cache ORDER BY last_accessed ASC")
.fetch_all(&**db)
.await?;
for (rowid, link) in rows {
if freed >= to_free {
break;
}
let size: i64 =
sqlx::query_scalar("SELECT content_size FROM gateway_cache WHERE rowid = ?")
.bind(rowid)
.fetch_one(&**db)
.await?;
sqlx::query("DELETE FROM gateway_cache WHERE rowid = ?")
.bind(rowid)
.execute(&**db)
.await?;
freed += size;
removed_links.push(link);
}
Ok(removed_links)
}
pub async fn evict_expired(
max_age_secs: u64,
db: &Database,
) -> Result<Vec<String>, sqlx::Error> {
let links: Vec<(String,)> = sqlx::query_as(
"SELECT DISTINCT link FROM gateway_cache
WHERE created_at < unixepoch() - ?",
)
.bind(max_age_secs as i64)
.fetch_all(&**db)
.await?;
sqlx::query("DELETE FROM gateway_cache WHERE created_at < unixepoch() - ?")
.bind(max_age_secs as i64)
.execute(&**db)
.await?;
Ok(links.into_iter().map(|(h,)| h).collect())
}
pub async fn referenced_links(db: &Database) -> Result<Vec<String>, sqlx::Error> {
let links: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT link FROM gateway_cache")
.fetch_all(&**db)
.await?;
Ok(links.into_iter().map(|(h,)| h).collect())
}
#[cfg(test)]
pub async fn count(db: &Database) -> Result<i64, sqlx::Error> {
sqlx::query_scalar("SELECT COUNT(*) FROM gateway_cache")
.fetch_one(&**db)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_insert_and_lookup() {
let db = Database::memory().await.unwrap();
let bucket = Uuid::new_v4();
let link = Hash::new(b"test content");
let mime = mime::IMAGE_JPEG;
assert!(
GatewayCacheEntry::lookup(&bucket, 1, Path::new("/photo.jpg"), None, &db)
.await
.unwrap()
.is_none()
);
GatewayCacheEntry::log(
&bucket,
1,
Path::new("/photo.jpg"),
None,
&link,
1024,
&mime,
&db,
)
.await
.unwrap();
let entry = GatewayCacheEntry::lookup(&bucket, 1, Path::new("/photo.jpg"), None, &db)
.await
.unwrap()
.unwrap();
assert_eq!(entry.link, link);
assert_eq!(entry.mime_type, mime::IMAGE_JPEG);
}
#[tokio::test]
async fn test_query_string_differentiates_entries() {
let db = Database::memory().await.unwrap();
let bucket = Uuid::new_v4();
let link_orig = Hash::new(b"original");
let link_thumb = Hash::new(b"thumbnail");
let mime = mime::IMAGE_JPEG;
GatewayCacheEntry::log(
&bucket,
1,
Path::new("/photo.jpg"),
None,
&link_orig,
5000,
&mime,
&db,
)
.await
.unwrap();
GatewayCacheEntry::log(
&bucket,
1,
Path::new("/photo.jpg"),
Some("w=200"),
&link_thumb,
500,
&mime,
&db,
)
.await
.unwrap();
let original = GatewayCacheEntry::lookup(&bucket, 1, Path::new("/photo.jpg"), None, &db)
.await
.unwrap()
.unwrap();
assert_eq!(original.link, link_orig);
let thumb =
GatewayCacheEntry::lookup(&bucket, 1, Path::new("/photo.jpg"), Some("w=200"), &db)
.await
.unwrap()
.unwrap();
assert_eq!(thumb.link, link_thumb);
}
#[tokio::test]
async fn test_evict_old_heights() {
let db = Database::memory().await.unwrap();
let bucket = Uuid::new_v4();
for h in 1u64..=3 {
let link = Hash::new(format!("v{}", h).as_bytes());
GatewayCacheEntry::log(
&bucket,
h,
Path::new("/file.txt"),
None,
&link,
100,
&mime::TEXT_PLAIN,
&db,
)
.await
.unwrap();
}
assert_eq!(GatewayCacheEntry::count(&db).await.unwrap(), 3);
let removed = GatewayCacheEntry::evict_old_heights(1, &db).await.unwrap();
assert_eq!(removed, 2);
assert_eq!(GatewayCacheEntry::count(&db).await.unwrap(), 1);
assert!(
GatewayCacheEntry::lookup(&bucket, 3, Path::new("/file.txt"), None, &db)
.await
.unwrap()
.is_some()
);
assert!(
GatewayCacheEntry::lookup(&bucket, 1, Path::new("/file.txt"), None, &db)
.await
.unwrap()
.is_none()
);
}
}