use std::env;
use std::sync::Arc;
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
use dashmap::DashSet;
use log::{debug, warn};
#[allow(unused_imports)]
use metrics::{gauge, increment_gauge, decrement_gauge, counter, histogram};
#[allow(unused_imports)]
use crate::models::{Post, Image, PostUpdate, Board, Thread, ImageInfo, ImageJob,
ThreadInfo, ThreadJob, ThreadNo};
#[allow(unused_imports)]
use crate::util::strip_nullchars;
pub async fn sqlx_connection() -> sqlx::Pool<sqlx::Postgres> {
use sqlx::postgres::PgPoolOptions;
dotenv::dotenv().ok();
let pool = PgPoolOptions::new()
.max_connections(50)
.connect(&env::var("DATABASE_URL")
.expect("DATABASE_URL must be set")).await
.expect("Failed to connect to database");
pool
}
#[derive(Clone)]
pub struct DBClient {
pub pool: sqlx::Pool<sqlx::Postgres>,
post_hashes: Arc<DashSet<u64>>,
tinfo_hashes: Arc<DashSet<u64>>
}
impl DBClient {
pub async fn new() -> Self {
Self {
pool: sqlx_connection().await,
post_hashes: Arc::new(DashSet::new()),
tinfo_hashes: Arc::new(DashSet::new())
}
}
pub async fn get_image_backlog_size(&self, min_page: i32) -> anyhow::Result<i64> {
struct Count {
count: Option<i64>
}
let count = sqlx::query_as!(Count,
"
SELECT
count(*)
FROM image_backlog
WHERE page >= $1
",
min_page
).fetch_one(&self.pool).await?;
Ok(count.count.unwrap_or(0))
}
pub async fn get_thread_backlog_size(&self, min_page: i32) -> anyhow::Result<i64> {
struct Count {
count: Option<i64>
}
let count = sqlx::query_as!(Count,
"
SELECT
count(*)
FROM thread_backlog
WHERE page >= $1
",
min_page
).fetch_one(&self.pool).await?;
Ok(count.count.unwrap_or(0))
}
pub async fn get_stored_files(&self) -> anyhow::Result<i64> {
struct Count {
count: Option<i64>
}
let count = sqlx::query_as!(Count,
"
SELECT
count(*)
FROM posts
WHERE file_sha256 != ''
"
).fetch_one(&self.pool).await?;
Ok(count.count.unwrap_or(0))
}
pub async fn get_stored_thumbnails(&self) -> anyhow::Result<i64> {
struct Count {
count: Option<i64>
}
let count = sqlx::query_as!(Count,
"
SELECT
count(*)
FROM posts
WHERE thumbnail_sha256 != ''
"
).fetch_one(&self.pool).await?;
Ok(count.count.unwrap_or(0))
}
pub async fn get_missing_thumbnails(&self) -> anyhow::Result<i64> {
struct Count {
count: Option<i64>
}
let count = sqlx::query_as!(Count,
"
SELECT
count(*)
FROM posts
WHERE thumbnail_sha256 = ''
AND tim != 0 AND filedeleted = 0 AND deleted_on = 0
"
).fetch_one(&self.pool).await?;
Ok(count.count.unwrap_or(0))
}
pub async fn get_latest_images(&self, limit: i64, offset: i64, boards: Vec<String>) -> anyhow::Result<Vec<Post>> {
let posts = sqlx::query_as!(Post,
"
SELECT *
FROM posts
WHERE thumbnail_sha256 != '' AND board = ANY($1)
ORDER BY last_modified DESC
LIMIT $2 OFFSET $3
",
&boards,
limit,
offset
)
.fetch_all(&self.pool)
.await?;
Ok(posts)
}
pub async fn get_all_boards(&self) -> anyhow::Result<Vec<Board>> {
let boards = sqlx::query_as!(Board,
"SELECT * FROM boards ORDER BY name ASC"
).fetch_all(&self.pool)
.await?;
Ok(boards)
}
pub async fn get_board(&self, board_name: &String) -> anyhow::Result<Option<Board>> {
let board = sqlx::query_as!(Board,
"SELECT * FROM boards WHERE name = $1",
board_name,
).fetch_optional(&self.pool)
.await?;
Ok(board)
}
pub async fn delete_board(&self, board_name: &String) -> anyhow::Result<u64> {
let res: u64 = sqlx::query!(
"DELETE FROM boards WHERE name = $1",
board_name,
).execute(&self.pool)
.await?
.rows_affected();
Ok(res)
}
pub async fn insert_board(&self, board: &Board) -> anyhow::Result<Board> {
let job = sqlx::query_as!(Board,
"
INSERT INTO boards (name, full_images, archive)
VALUES
($1, $2, $3)
ON CONFLICT(name) DO
UPDATE SET
full_images = $2,
archive = $3
RETURNING *;
",
board.name,
board.full_images,
board.archive
).fetch_one(&self.pool)
.await?;
Ok(job)
}
pub async fn get_image_jobs(&self, limit: i64) -> anyhow::Result<Vec<ImageJob>> {
let jobs = sqlx::query_as!(ImageJob,
"
SELECT * FROM image_backlog
ORDER BY page DESC, id ASC
LIMIT $1
",
limit
).fetch_all(&self.pool)
.await?;
Ok(jobs)
}
pub async fn delete_image_job(&self, job_id: i64) -> anyhow::Result<u64> {
let res: u64 = sqlx::query!(
"DELETE FROM image_backlog WHERE id = $1",
job_id,
).execute(&self.pool)
.await?
.rows_affected();
Ok(res)
}
pub async fn get_image_job(&self, job_id: i64) -> anyhow::Result<Option<ImageJob>> {
let job = sqlx::query_as!(ImageJob,
"
SELECT * FROM image_backlog WHERE id = $1
",
job_id,
).fetch_optional(&self.pool)
.await?;
Ok(job)
}
pub async fn insert_image_job(&self, img: &ImageInfo) -> anyhow::Result<ImageJob> {
let job = sqlx::query_as!(ImageJob,
"
INSERT INTO image_backlog (
board, -- 1
no, -- 2
url, -- 3
thumbnail_url, -- 4
ext, -- 5
page, -- 6
file_sha256, -- 7
thumbnail_sha256 -- 8
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT(board, no) DO UPDATE
SET
page = $6
WHERE image_backlog.board = $1 AND image_backlog.no = $2
RETURNING *;
",
img.board, img.no, img.url, img.thumbnail_url, img.ext, img.page, img.file_sha256, img.thumbnail_sha256 ).fetch_one(&self.pool)
.await?;
Ok(job)
}
pub async fn delete_thread_job(&self, job_id: i64) -> anyhow::Result<u64> {
let res: u64 = sqlx::query!(
"DELETE FROM thread_backlog WHERE id = $1",
job_id,
).execute(&self.pool)
.await?
.rows_affected();
Ok(res)
}
pub async fn get_thread_jobs(&self, limit: i64) -> anyhow::Result<Vec<ThreadJob>> {
let jobs = sqlx::query_as!(ThreadJob,
"
SELECT * FROM thread_backlog
ORDER BY page DESC, id ASC
LIMIT $1
",
limit
).fetch_all(&self.pool)
.await?;
Ok(jobs)
}
fn get_threadinfo_hash(&self, tinfo: &ThreadInfo) -> u64 {
let mut hasher = DefaultHasher::new();
tinfo.hash(& mut hasher);
hasher.finish()
}
fn insert_threadinfo_hash(&self, tinfo_hash: u64) {
if self.tinfo_hashes.len() > 100000000 {
warn!("Thread Job hash store reached over 100 million entries, clearing.");
self.tinfo_hashes.clear();
self.tinfo_hashes.shrink_to_fit();
}
self.tinfo_hashes.insert(tinfo_hash);
gauge!("thread_jobs_hashes", self.tinfo_hashes.len() as f64);
}
pub async fn insert_thread_job(&self, tinfo: &ThreadInfo) -> anyhow::Result<Option<ThreadJob>> {
let tinfo_hash = self.get_threadinfo_hash(&tinfo);
if self.tinfo_hashes.contains(&tinfo_hash) {
debug!("Skip adding duplicate thread job for /{}/{} [{}] ({} - {})",
tinfo.board, tinfo.no, tinfo.last_modified, tinfo.replies, tinfo.page);
return Ok(None)
}
if let Some(post) = self.get_post(&tinfo.board, tinfo.no).await? {
if post.last_modified >= tinfo.last_modified {
self.insert_threadinfo_hash(tinfo_hash);
return Ok(None)
}
}
let job = sqlx::query_as!(ThreadJob,
"
INSERT INTO thread_backlog (board, no, last_modified, replies, page)
VALUES
($1, $2, $3, $4, $5)
ON CONFLICT(board, no, last_modified) DO
UPDATE SET
replies = $4,
page = $5
RETURNING *;
",
tinfo.board,
tinfo.no,
tinfo.last_modified,
tinfo.replies,
tinfo.page
).fetch_one(&self.pool)
.await?;
counter!("thread_job_writes", 1);
self.insert_threadinfo_hash(tinfo_hash);
let res: u64 = sqlx::query!(
"DELETE FROM thread_backlog WHERE board = $1 AND no = $2 AND last_modified < $3",
tinfo.board,
tinfo.no,
tinfo.last_modified
).execute(&self.pool)
.await?
.rows_affected();
if res > 0 {
debug!("Deleting {} obsolete thread jobs", res);
}
Ok(Some(job))
}
pub async fn image_tim_to_sha256(&self, board: &String, image_tim: i64, thumb: bool) -> anyhow::Result<Option<String>> {
let post_opt = sqlx::query_as!(Post,
"
SELECT *
FROM posts
WHERE board = $1
AND tim = $2
",
board,
image_tim
).fetch_optional(&self.pool)
.await?;
if let Some(post) = post_opt {
if thumb && !post.thumbnail_sha256.is_empty() {
return Ok(Some(post.thumbnail_sha256))
}
if !thumb && !post.file_sha256.is_empty() {
return Ok(Some(post.file_sha256))
}
}
Ok(None)
}
pub async fn get_post(&self, board: &String, post_no: i64) -> anyhow::Result<Option<Post>> {
let post = sqlx::query_as!(Post,
"
SELECT *
FROM posts
WHERE board = $1 AND no = $2
",
board,
post_no
)
.fetch_optional(&self.pool)
.await?;
Ok(post)
}
pub async fn delete_post(&self, board: &String, post_no: i64) -> anyhow::Result<u64> {
let res: u64 = sqlx::query!(
"
DELETE FROM posts WHERE board = $1 AND no = $2
",
board,
post_no
).execute(&self.pool)
.await?
.rows_affected();
Ok(res)
}
pub async fn get_thread_index(&self, board: &String, index: i64, limit: i64) -> anyhow::Result<Vec<Thread>> {
let thread_ids = sqlx::query_as!(ThreadNo,
"
SELECT t1.resto FROM posts t1
LEFT JOIN posts t2
ON t1.resto = t2.resto AND t1.no < t2.no
WHERE t2.no IS NULL and t1.board = $1
ORDER BY t1.no DESC OFFSET $2 LIMIT $3
",
board,
index*limit,
limit
).fetch_all(&self.pool)
.await?;
let mut threads = Vec::new();
for thread_id in thread_ids {
if let Some(thread) = self.get_thread(board, thread_id.resto).await? {
threads.push(thread);
}
}
Ok(threads)
}
pub async fn get_thread(&self, board: &String, no: i64) -> anyhow::Result<Option<Thread>> {
let posts = sqlx::query_as!(Post,
"
SELECT *
FROM posts
WHERE board = $1
AND (no = $2 OR resto = $2)
ORDER BY no ASC
",
board,
no
)
.fetch_all(&self.pool)
.await?;
if posts.is_empty() {
return Ok(None);
}
if posts.len() == 1 && posts[0].resto != 0 {
return Ok(None);
}
Ok(Some(Thread{posts}))
}
pub async fn set_post_files(&self, board: &String, no: i64, file_sha256: &String, thumbnail_sha256: &String) -> anyhow::Result<Option<Post>> {
let post = sqlx::query_as!(Post,
"
UPDATE posts
SET
file_sha256 = $1,
thumbnail_sha256 = $2
WHERE board = $3 AND no = $4
RETURNING *
",
file_sha256,
thumbnail_sha256,
board,
no
).fetch_optional(&self.pool)
.await?;
Ok(post)
}
pub async fn set_post_deleted(&self, board: &String, no: i64, deleted_time: i64) -> anyhow::Result<Option<Post>> {
let post = sqlx::query_as!(Post,
"
UPDATE posts
SET
deleted_on = $1
WHERE board = $2 AND no = $3
RETURNING *
",
deleted_time,
board,
no
).fetch_optional(&self.pool)
.await?;
Ok(post)
}
pub async fn set_missing_posts_deleted(&self, board: &String, thread_no: i64, current_posts: Vec<i64>, deleted_time: i64) -> anyhow::Result<Vec<Post>> {
let post = sqlx::query_as!(Post,
"
UPDATE posts
SET
deleted_on = $1
WHERE board = $2 AND resto = $3 AND deleted_on = 0 AND no != ALL($4)
RETURNING *
",
deleted_time,
board,
thread_no,
¤t_posts
).fetch_all(&self.pool)
.await?;
Ok(post)
}
pub async fn get_files_exclusive_to_board(&self, board: &String) -> anyhow::Result<Vec<String>> {
struct Sha256Field {
file_sha256: Option<String>
}
let hashes: Vec<Sha256Field> = sqlx::query_as!(Sha256Field,
"
SELECT file_sha256 FROM posts WHERE board = $1 and file_sha256 != ''
EXCEPT
SELECT file_sha256 FROM posts WHERE board != $1 and file_sha256 != ''
",
board
).fetch_all(&self.pool)
.await?;
Ok(hashes.into_iter().filter(|h| h.file_sha256.is_some())
.map(|h| h.file_sha256.unwrap_or_default()).collect())
}
fn get_post_hash(&self, post: &Post) -> u64 {
let mut hasher = DefaultHasher::new();
let mut hash_post = post.clone();
hash_post.post_id = 0;
if hash_post.resto != 0 {
hash_post.last_modified = 0;
}
hash_post.file_sha256 = "".to_string();
hash_post.thumbnail_sha256 = "".to_string();
hash_post.hash(& mut hasher);
hasher.finish()
}
pub async fn insert_posts(&self, entries: &Vec<Post>) -> anyhow::Result<Vec<Post>> {
let mut posts = Vec::new();
for entry in entries {
let hash = self.get_post_hash(entry);
if self.post_hashes.contains(&hash) {
debug!("Post has not changed, skipped (/{}/{})", entry.board, entry.no);
continue;
}
let post = sqlx::query_as!(Post,
"
INSERT INTO posts(
board, -- 1
no, -- 2
resto, -- 3
sticky, -- 4
closed, -- 5
now, -- 6
time, -- 7
name, -- 8
trip, -- 9
id, -- 10
capcode, -- 11
country, -- 12
country_name, -- 13
sub, -- 14
com, -- 15
tim, -- 16
filename, -- 17
ext, -- 18
fsize, -- 19
md5, -- 20
w, -- 21
h, -- 22
tn_w, -- 23
tn_h, -- 24
filedeleted, -- 25
spoiler, -- 26
custom_spoiler, -- 27
replies, -- 28
images, -- 29
bumplimit, -- 30
imagelimit, -- 31
tag, -- 32
semantic_url, -- 33
since4pass, -- 34
unique_ips, -- 35
m_img, -- 36
archived, -- 37
archived_on, -- 38
last_modified, -- 39
file_sha256, -- 40
thumbnail_sha256, -- 41
deleted_on -- 42
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19,
$20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42)
ON CONFLICT (board, no) DO
UPDATE
SET
closed = $5,
sticky = $4,
com = $15,
filedeleted = $25,
spoiler = $26,
custom_spoiler = $27,
replies = $28,
images = $29,
bumplimit = $30,
imagelimit = $31,
unique_ips = CASE WHEN posts.unique_ips < $35 THEN $35 ELSE posts.unique_ips END,
archived = $37,
archived_on = $38,
last_modified = $39,
deleted_on = $42
WHERE posts.board = $1 AND posts.no = $2
RETURNING *;
",
entry.board, entry.no, entry.resto, entry.sticky, entry.closed, entry.now, entry.time, strip_nullchars(&entry.name), entry.trip, entry.id, entry.capcode, entry.country, entry.country_name, strip_nullchars(&entry.sub), strip_nullchars(&entry.com), entry.tim, strip_nullchars(&entry.filename), strip_nullchars(&entry.ext), entry.fsize, entry.md5, entry.w, entry.h, entry.tn_w, entry.tn_h, entry.filedeleted, entry.spoiler, entry.custom_spoiler, entry.replies, entry.images, entry.bumplimit, entry.imagelimit, strip_nullchars(&entry.tag), entry.semantic_url, entry.since4pass, entry.unique_ips, entry.m_img, entry.archived, entry.archived_on, entry.last_modified, entry.file_sha256, entry.thumbnail_sha256, entry.deleted_on )
.fetch_one(&self.pool)
.await?;
counter!("post_writes", 1);
if self.post_hashes.len() > 100000000 {
warn!("Post hash store reached over 100 million entries, clearing.");
self.post_hashes.clear();
self.post_hashes.shrink_to_fit();
}
self.post_hashes.insert(hash);
gauge!("post_hashes", self.post_hashes.len() as f64);
posts.push(post);
}
Ok(posts)
}
}
impl std::panic::UnwindSafe for DBClient {}
impl std::panic::RefUnwindSafe for DBClient {}
#[cfg(test)]
#[allow(non_snake_case)]
mod tests {
use super::*;
fn run_async<F: std::future::Future>(f: F) {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(f);
}
#[test]
fn test_migrations(){
run_async(migrations());
}
async fn migrations() {
let dbc = DBClient::new().await;
sqlx::migrate!()
.run(&dbc.pool)
.await.expect("Failed to migrate");
}
#[test]
fn test_post_insert_upsert(){
run_async(post_insert_upsert());
}
async fn post_insert_upsert() {
let dbc = DBClient::new().await;
let mut post1 = Post::default();
post1.board = "test".to_string();
post1.no = 10;
post1.time = 1337;
post1.images = 77;
let mut post2 = Post::default();
post2.board = "test".to_string();
post2.no = 11;
post2.time = 1559;
post2.images = 55;
let mut post3 = Post::default();
post3.board = "test".to_string();
post3.no = 12;
post3.time = 4444;
assert_eq!(3usize, dbc.insert_posts(&vec![post1.clone(), post2.clone(), post3.clone()]).await.unwrap().len());
post3.time = 5555;
post2.images = 2222;
post1.images = 1111;
assert_eq!(3usize, dbc.insert_posts(&vec![post1.clone(), post2.clone(), post3.clone()]).await.unwrap().len());
assert_eq!(1111, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().images);
assert_eq!(2222, dbc.get_post(&post2.board, post2.no).await.unwrap().unwrap().images);
assert_eq!(4444, dbc.get_post(&post3.board, post3.no).await.unwrap().unwrap().time);
assert_eq!(1, dbc.delete_post(&post1.board, post1.no).await.unwrap());
assert_eq!(1, dbc.delete_post(&post2.board, post2.no).await.unwrap());
assert_eq!(1, dbc.delete_post(&post3.board, post3.no).await.unwrap());
}
#[test]
fn test_post_nullchars(){
run_async(post_insert_nullchars());
}
async fn post_insert_nullchars() {
let dbc = DBClient::new().await;
let mut post1 = Post::default();
post1.board = "test".to_string();
post1.no = 10;
post1.time = 1337;
post1.images = 77;
post1.com = "test \u{00} test \u{00}".to_string();
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(77, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().images);
assert_eq!(1, dbc.delete_post(&post1.board, post1.no).await.unwrap());
}
#[test]
fn post_history(){
run_async(post_update_com());
}
async fn post_update_com() {
let dbc = DBClient::new().await;
let mut post1 = Post::default();
post1.board = "test".to_string();
post1.no = 10;
post1.com = "Comment2".to_string();
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
post1.images = 77;
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(77, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().images);
post1.com = "Comment Changed".to_string();
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
post1.com = "Comment Changed Again".to_string();
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(post1.com, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().com);
assert_eq!(1, dbc.delete_post(&post1.board, post1.no).await.unwrap());
}
#[test]
fn test_post_update(){
run_async(post_update());
}
async fn post_update(){
let dbc = DBClient::new().await;
let mut post1 = Post::default();
post1.board = "test".to_string();
post1.no = 10;
dbc.insert_posts(&vec![post1.clone()]).await.unwrap();
post1.images = 55;
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(55, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().images);
post1.time = 500;
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(0, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().time);
post1.unique_ips = 30;
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(30, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().unique_ips);
post1.unique_ips = 0;
assert_eq!(1usize, dbc.insert_posts(&vec![post1.clone()]).await.unwrap().len());
assert_eq!(30, dbc.get_post(&post1.board, post1.no).await.unwrap().unwrap().unique_ips);
assert_eq!(1u64, dbc.delete_post(&post1.board, post1.no).await.unwrap());
assert_eq!(None, dbc.get_post(&post1.board, post1.no).await.unwrap());
}
#[test]
fn test_post_deleted_detection(){
run_async(post_deleted());
}
async fn post_deleted(){
let dbc = DBClient::new().await;
let mut post1 = Post::default();
post1.board = "test".to_string();
post1.resto = 1;
post1.no = 1;
let mut post2 = Post::default();
post2.board = "test".to_string();
post2.resto = 1;
post2.no = 2;
let mut post3 = Post::default();
post3.board = "test".to_string();
post3.resto = 1;
post3.no = 3;
let mut post4 = Post::default();
post4.board = "test".to_string();
post4.resto = 1;
post4.no = 4;
dbc.insert_posts(&vec![post1.clone(), post2.clone(), post3.clone(), post4.clone()]).await.unwrap();
assert_eq!(0usize, dbc.set_missing_posts_deleted(&post1.board, post1.resto, vec![post1.no, post2.no, post3.no, post4.no], 10).await.unwrap().len());
let nos: Vec<(i64, i64)> = dbc.set_missing_posts_deleted(&post1.board, post1.resto, vec![post1.no, post2.no, post4.no], 10).await
.unwrap().iter().map(|p| (p.no, p.deleted_on)).collect();
assert_eq!(1, nos.len());
assert_eq!((3, 10), nos[0]);
let nos: Vec<(i64, i64)> = dbc.set_missing_posts_deleted(&post1.board, post1.resto, vec![post1.no, post2.no], 20).await
.unwrap().iter().map(|p| (p.no, p.deleted_on)).collect();
assert_eq!(1, nos.len());
assert_eq!((4, 20), nos[0]);
let nos: Vec<(i64, i64)> = dbc.set_missing_posts_deleted(&post1.board, post1.resto, vec![post1.no], 30).await
.unwrap().iter().map(|p| (p.no, p.deleted_on)).collect();
assert_eq!(1, nos.len());
assert_eq!((2, 30), nos[0]);
assert_eq!(0usize, dbc.set_missing_posts_deleted(&post1.board, post1.resto, vec![post1.no], 10).await.unwrap().len());
assert_eq!(1u64, dbc.delete_post(&post1.board, post1.no).await.unwrap());
assert_eq!(1u64, dbc.delete_post(&post2.board, post2.no).await.unwrap());
assert_eq!(1u64, dbc.delete_post(&post3.board, post3.no).await.unwrap());
assert_eq!(1u64, dbc.delete_post(&post4.board, post4.no).await.unwrap());
assert_eq!(None, dbc.get_post(&post1.board, post1.no).await.unwrap());
}
#[test]
fn test_job_update(){
run_async(job_insert_update());
}
async fn job_insert_update() {
let dbc = DBClient::new().await;
let mut img = ImageInfo::default();
img.board = "test_a".to_string();
img.no = 777;
img.url = "url1".to_string();
img.page = 5;
let img_job = dbc.insert_image_job(&img).await.unwrap();
img.url = "url2".to_string();
img.page = 6;
assert_eq!(img_job.id, dbc.insert_image_job(&img).await.unwrap().id);
let mut img_b = ImageInfo::default();
img_b.board = "test_b".to_string();
img_b.no = 777;
img_b.url = "urlB".to_string();
img_b.page = 77;
let img_b_job = dbc.insert_image_job(&img_b).await.unwrap();
assert_eq!(false, img_job.id == img_b_job.id);
assert_eq!("url1".to_string(), dbc.get_image_job(img_job.id).await.unwrap().unwrap().url);
assert_eq!(6, dbc.get_image_job(img_job.id).await.unwrap().unwrap().page);
assert_eq!("urlB".to_string(), dbc.get_image_job(img_b_job.id).await.unwrap().unwrap().url);
assert_eq!(77, dbc.get_image_job(img_b_job.id).await.unwrap().unwrap().page);
assert_eq!(1u64, dbc.delete_image_job(img_b_job.id).await.unwrap());
assert_eq!(1u64, dbc.delete_image_job(img_job.id).await.unwrap());
assert_eq!(None, dbc.get_image_job(img_job.id).await.unwrap());
assert_eq!(None, dbc.get_image_job(img_b_job.id).await.unwrap());
}
#[test]
fn test_many_post_update(){
run_async(many_post_update());
}
async fn many_post_update(){
let dbc = DBClient::new().await;
let mut post = dbc.get_post(&"vip".to_string(), 103205).await.unwrap().unwrap();
post.board = "test".to_string();
for i in 0..100000 {
post.last_modified = i;
dbc.insert_posts(&vec![post.clone()]).await.unwrap();
}
}
#[test]
fn test_image_filtering(){
run_async(get_images_board());
}
async fn get_images_board(){
let dbc = DBClient::new().await;
let hashes = dbc.get_files_exclusive_to_board(&"vip".to_string()).await.unwrap();
println!("{}", hashes.len());
}
#[test]
fn test_count(){
run_async(get_image_backlog_count());
}
async fn get_image_backlog_count(){
let dbc = DBClient::new().await;
println!("{}", dbc.get_image_backlog_size(2).await.unwrap());
}
}