use crate::{
database::{Database, DatabaseError},
query::{ImageQuery, TagQuery},
storage::{ImageMetadata, MediaPath, PixelHash, Storage, StorageError},
utils,
};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
};
use tokio::task::JoinSet;
pub struct ArchiveImageCommand {
pub bytes: Vec<u8>,
pub tags: Vec<String>,
pub source: Option<String>,
}
impl ArchiveImageCommand {
pub fn new(bytes: &[u8]) -> Self {
ArchiveImageCommand {
bytes: bytes.to_vec(),
tags: vec![],
source: None,
}
}
pub fn with_tags<T: IntoIterator<Item = String>>(mut self, tags: T) -> Self {
self.tags = tags.into_iter().collect();
self
}
pub fn with_source(mut self, src: &str) -> Self {
self.source = Some(src.to_string());
self
}
pub async fn execute(self, storage: &Storage, db: &Database) -> Result<Media, AppError> {
utils::retry(|| async {
let hash = self.ensure_image_in_storage(storage).await?;
let metadata = storage.get_metadata(&hash)?;
let media = self
.ensure_image_in_db(storage, db, &hash, metadata)
.await?;
Ok(media)
})
.await
}
async fn ensure_image_in_db(
&self,
storage: &Storage,
db: &Database,
hash: &PixelHash,
metadata: ImageMetadata,
) -> Result<Media, AppError> {
db.ensure_image(hash).await?;
db.ensure_image_has_metadata(hash, &metadata).await?;
if !self.tags.is_empty() {
attach_tags(
db,
storage,
hash,
&self.tags.iter().map(|s| s.as_str()).collect::<Vec<&str>>(),
)
.await?;
}
if let Some(src) = &self.source {
attach_source(db, storage, hash, &src).await?;
}
find_image_by_hash(db, storage, hash).await
}
async fn ensure_image_in_storage(&self, storage: &Storage) -> Result<PixelHash, AppError> {
let hash = match storage.create_file(&self.bytes) {
Ok(hash) => Ok(hash),
Err(e) => match &e {
StorageError::HashCollision { hash, .. } => Ok(hash.clone()),
_ => Err(e),
},
}?;
Ok(hash)
}
}
pub async fn attach_tags(
db: &Database,
storage: &Storage,
hash: &PixelHash,
tags: &[&str],
) -> Result<(), AppError> {
if storage.index_file(hash).is_none() {
return Err(AppError::StorageNotFound { hash: hash.clone() });
}
let desired: HashSet<&str> = tags.iter().copied().collect();
let current = db.get_tags(hash).await?;
let current: HashSet<&str> = current.iter().map(|f| f.as_str()).collect();
let to_add: Vec<&str> = desired.difference(¤t).copied().collect();
let to_remove: Vec<&str> = current.difference(&desired).copied().collect();
db.ensure_image_has_tags(hash, to_add.as_slice()).await?;
db.ensure_tags_removed(hash, to_remove.as_slice()).await?;
Ok(())
}
pub async fn attach_source(
db: &Database,
storage: &Storage,
hash: &PixelHash,
src: &str,
) -> Result<(), AppError> {
if storage.index_file(hash).is_none() {
return Err(AppError::StorageNotFound { hash: hash.clone() });
}
db.ensure_image(hash).await?;
db.ensure_image_has_source(hash, src).await?;
Ok(())
}
pub async fn remove_image(
storage: &Storage,
db: &Database,
hash: PixelHash,
) -> Result<(), AppError> {
storage.ensure_deleted(&hash)?;
db.ensure_image_removed(&hash).await?;
Ok(())
}
pub async fn find_image_by_hash(
db: &Database,
storage: &Storage,
hash: &PixelHash,
) -> Result<Media, AppError> {
let path = storage
.index_file(hash)
.ok_or_else(|| AppError::StorageNotFound { hash: hash.clone() })?;
let tags = db.get_tags(hash).await?;
let metadata = db.get_metadata(hash).await?.unwrap_or_default();
let source = db.get_source(hash).await?;
Ok(Media {
path,
hash: hash.clone(),
tags,
metadata,
source,
})
}
pub async fn query_image(
db: &Database,
storage: &Storage,
query: ImageQuery,
) -> Result<Vec<Media>, AppError> {
let hashes = db.query_image(query).await?;
let mut set = JoinSet::new();
for hash in hashes.clone() {
let db = db.clone();
let storage = storage.clone();
set.spawn(async move {
let image = find_image_by_hash(&db, &storage, &hash)
.await
.unwrap_or(Media {
path: MediaPath::Image(PathBuf::from("unknown.png")),
hash: hash.clone(),
metadata: ImageMetadata::default(),
tags: vec![],
source: None,
});
(hash, image)
});
}
let mut map = HashMap::new();
while let Some(result) = set.join_next().await {
match result {
Ok((hash, image)) => {
map.insert(hash, image);
}
Err(join_err) => panic!("task panicked in image retrieval: {join_err}"),
}
}
let images = hashes.into_iter().filter_map(|h| map.remove(&h)).collect();
Ok(images)
}
pub async fn count_image(db: &Database, query: ImageQuery) -> Result<u64, AppError> {
Ok(db.count_image(query).await?)
}
pub async fn count_image_by_tag(db: &Database, tag: &str) -> Result<u64, AppError> {
Ok(db.count_image_by_tag(tag).await?)
}
pub async fn refresh_count(db: &Database) -> Result<(), AppError> {
Ok(db.refresh_image_count().await?)
}
pub async fn query_tags(db: &Database, query: TagQuery) -> Result<Vec<String>, AppError> {
db.query_tags(query).await.map_err(AppError::from)
}
#[derive(Debug, Clone, PartialEq)]
pub struct Media {
pub path: MediaPath,
pub hash: PixelHash,
pub metadata: ImageMetadata,
pub tags: Vec<String>,
pub source: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum AppError {
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("database error: {0}")]
Database(#[from] DatabaseError),
#[error("image not found: {hash}")]
StorageNotFound { hash: PixelHash },
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use crate::{
app::{ArchiveImageCommand, attach_tags, find_image_by_hash, query_image, remove_image},
database::{Database, MIGRATOR, Pool},
query::{ImageQuery, ImageQueryExpr, ImageQueryKind},
storage::{MediaPath, Storage},
};
use tempfile::TempDir;
fn get_storage() -> Storage {
let tmp_dir = TempDir::new().unwrap();
Storage::new(tmp_dir.path().to_path_buf())
}
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_query(pool: Pool) {
let db = Database::new(pool);
let storage = get_storage();
let file_bytes = include_bytes!("../testdata/44a5b6f94f4f6445.png");
ArchiveImageCommand::new(file_bytes)
.with_tags(["cat".to_string()])
.with_source("https://example.com")
.execute(&storage, &db)
.await
.unwrap();
let query = ImageQuery::new(ImageQueryKind::Where(ImageQueryExpr::tag("cat")));
let res = query_image(&db, &storage, query).await.unwrap();
dbg!(res);
}
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_remove_image(pool: Pool) {
let db = Database::new(pool);
let storage = get_storage();
let file_bytes = include_bytes!("../testdata/44a5b6f94f4f6445.png");
let image = ArchiveImageCommand::new(file_bytes)
.with_tags(["cat".to_string()])
.with_source("https://example.com")
.execute(&storage, &db)
.await
.unwrap();
remove_image(&storage, &db, image.hash).await.unwrap();
}
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_attach_tags(pool: Pool) {
let db = Database::new(pool);
let storage = get_storage();
let file_bytes = include_bytes!("../testdata/44a5b6f94f4f6445.png");
let image = ArchiveImageCommand::new(file_bytes)
.with_tags(["cat".to_string(), "scary".to_string()])
.with_source("https://example.com")
.execute(&storage, &db)
.await
.unwrap();
let desired = &["cat", "cute"];
attach_tags(&db, &storage, &image.hash, desired)
.await
.unwrap();
assert_eq!(
desired.to_vec(),
find_image_by_hash(&db, &storage, &image.hash)
.await
.unwrap()
.tags
);
}
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_find_image_by_hash(pool: Pool) {
let db = Database::new(pool);
let storage = get_storage();
let file_bytes = include_bytes!("../testdata/44a5b6f94f4f6445.png");
let image = ArchiveImageCommand::new(file_bytes)
.with_tags(["cat".to_string()])
.with_source("https://example.com")
.execute(&storage, &db)
.await
.unwrap();
let found = find_image_by_hash(&db, &storage, &image.hash)
.await
.unwrap();
assert_eq!(image, found);
}
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_query_image(pool: Pool) {
let db = Database::new(pool);
let storage = get_storage();
let file_bytes = include_bytes!("../testdata/44a5b6f94f4f6445.png");
let image = ArchiveImageCommand::new(file_bytes)
.with_tags(["cat".to_string()])
.with_source("https://example.com")
.execute(&storage, &db)
.await
.unwrap();
let query = ImageQuery::new(ImageQueryKind::Where(ImageQueryExpr::tag("cat")));
let images = query_image(&db, &storage, query).await.unwrap();
assert_eq!(images.len(), 1);
assert_eq!(images[0], image);
}
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_query_iamge_but_missing_image_in_storage(pool: Pool) {
let db = Database::new(pool);
let storage = get_storage();
let file_bytes = include_bytes!("../testdata/44a5b6f94f4f6445.png");
let image = ArchiveImageCommand::new(file_bytes)
.with_tags(["cat".to_string()])
.with_source("https://example.com")
.execute(&storage, &db)
.await
.unwrap();
storage.ensure_deleted(&image.hash).unwrap();
let query = ImageQuery::new(ImageQueryKind::Where(ImageQueryExpr::tag("cat")));
let images = query_image(&db, &storage, query).await.unwrap();
assert_eq!(images.len(), 1);
assert_eq!(images[0].hash, image.hash);
assert_eq!(
images[0].path,
MediaPath::Image(PathBuf::from("unknown.png"))
);
}
}