use crate::error::DatabaseError;
use crate::types::DownloadId;
use crate::{Error, Result};
use super::{Article, Database, NewArticle, article_status};
impl Database {
pub async fn insert_article(&self, article: &NewArticle) -> Result<i64> {
let result = sqlx::query(
r#"
INSERT INTO download_articles (
download_id, message_id, segment_number, file_index, size_bytes, status
) VALUES (?, ?, ?, ?, ?, 0)
"#,
)
.bind(article.download_id)
.bind(&article.message_id)
.bind(article.segment_number)
.bind(article.file_index)
.bind(article.size_bytes)
.execute(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to insert article: {}",
e
)))
})?;
Ok(result.last_insert_rowid())
}
pub async fn insert_articles_batch(&self, articles: &[NewArticle]) -> Result<()> {
if articles.is_empty() {
return Ok(());
}
const MAX_ARTICLES_PER_BATCH: usize = 166;
for chunk in articles.chunks(MAX_ARTICLES_PER_BATCH) {
let mut query_builder = sqlx::QueryBuilder::new(
"INSERT INTO download_articles (download_id, message_id, segment_number, file_index, size_bytes, status) ",
);
query_builder.push_values(chunk, |mut b, article| {
b.push_bind(article.download_id)
.push_bind(&article.message_id)
.push_bind(article.segment_number)
.push_bind(article.file_index)
.push_bind(article.size_bytes)
.push_bind(0); });
let query = query_builder.build();
query.execute(&self.pool).await.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to insert articles batch: {}",
e
)))
})?;
}
Ok(())
}
pub async fn update_article_status(&self, article_id: i64, status: i32) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query(
r#"
UPDATE download_articles
SET status = ?, downloaded_at = ?
WHERE id = ?
"#,
)
.bind(status)
.bind(if status == article_status::DOWNLOADED {
Some(now)
} else {
None
})
.bind(article_id)
.execute(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to update article status: {}",
e
)))
})?;
Ok(())
}
pub async fn update_article_status_by_message_id(
&self,
download_id: DownloadId,
message_id: &str,
status: i32,
) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query(
r#"
UPDATE download_articles
SET status = ?, downloaded_at = ?
WHERE download_id = ? AND message_id = ?
"#,
)
.bind(status)
.bind(if status == article_status::DOWNLOADED {
Some(now)
} else {
None
})
.bind(download_id)
.bind(message_id)
.execute(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to update article status: {}",
e
)))
})?;
Ok(())
}
pub async fn update_articles_status_batch(&self, updates: &[(i64, i32)]) -> Result<()> {
if updates.is_empty() {
return Ok(());
}
const MAX_UPDATES_PER_BATCH: usize = 100;
let now = chrono::Utc::now().timestamp();
for chunk in updates.chunks(MAX_UPDATES_PER_BATCH) {
let mut query_builder =
sqlx::QueryBuilder::new("UPDATE download_articles SET status = CASE ");
for (article_id, status) in chunk {
query_builder.push("WHEN id = ");
query_builder.push_bind(*article_id);
query_builder.push(" THEN ");
query_builder.push_bind(*status);
query_builder.push(" ");
}
query_builder.push("END, downloaded_at = CASE ");
for (article_id, status) in chunk {
query_builder.push("WHEN id = ");
query_builder.push_bind(*article_id);
if *status == article_status::DOWNLOADED {
query_builder.push(" THEN ");
query_builder.push_bind(now);
} else {
query_builder.push(" THEN downloaded_at"); }
query_builder.push(" ");
}
query_builder.push("END WHERE id IN (");
let mut first = true;
for (article_id, _) in chunk {
if !first {
query_builder.push(", ");
}
query_builder.push_bind(*article_id);
first = false;
}
query_builder.push(")");
let query = query_builder.build();
query.execute(&self.pool).await.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to update articles status batch: {}",
e
)))
})?;
}
Ok(())
}
pub async fn get_articles(&self, download_id: DownloadId) -> Result<Vec<Article>> {
let rows = sqlx::query_as::<_, Article>(
r#"
SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
FROM download_articles
WHERE download_id = ?
ORDER BY file_index ASC, segment_number ASC
"#,
)
.bind(download_id)
.fetch_all(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to get articles: {}",
e
)))
})?;
Ok(rows)
}
pub async fn get_pending_articles(&self, download_id: DownloadId) -> Result<Vec<Article>> {
let rows = sqlx::query_as::<_, Article>(
r#"
SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
FROM download_articles
WHERE download_id = ? AND status = 0
ORDER BY file_index ASC, segment_number ASC
"#,
)
.bind(download_id)
.fetch_all(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to get pending articles: {}",
e
)))
})?;
Ok(rows)
}
pub async fn get_article_by_message_id(
&self,
download_id: DownloadId,
message_id: &str,
) -> Result<Option<Article>> {
let row = sqlx::query_as::<_, Article>(
r#"
SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
FROM download_articles
WHERE download_id = ? AND message_id = ?
"#,
)
.bind(download_id)
.bind(message_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to get article: {}",
e
)))
})?;
Ok(row)
}
pub async fn count_articles_by_status(
&self,
download_id: DownloadId,
status: i32,
) -> Result<i64> {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = ?",
)
.bind(download_id)
.bind(status)
.fetch_one(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to count articles: {}",
e
)))
})?;
Ok(count)
}
pub async fn count_articles(&self, download_id: DownloadId) -> Result<i64> {
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM download_articles WHERE download_id = ?")
.bind(download_id)
.fetch_one(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to count articles: {}",
e
)))
})?;
Ok(count)
}
pub async fn delete_articles(&self, download_id: DownloadId) -> Result<()> {
sqlx::query("DELETE FROM download_articles WHERE download_id = ?")
.bind(download_id)
.execute(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to delete articles: {}",
e
)))
})?;
Ok(())
}
pub async fn insert_files_batch(&self, files: &[super::NewDownloadFile]) -> Result<()> {
if files.is_empty() {
return Ok(());
}
const MAX_FILES_PER_BATCH: usize = 199;
for chunk in files.chunks(MAX_FILES_PER_BATCH) {
let mut query_builder = sqlx::QueryBuilder::new(
"INSERT INTO download_files (download_id, file_index, filename, subject, total_segments) ",
);
query_builder.push_values(chunk, |mut b, file| {
b.push_bind(file.download_id)
.push_bind(file.file_index)
.push_bind(&file.filename)
.push_bind(&file.subject)
.push_bind(file.total_segments);
});
let query = query_builder.build();
query.execute(&self.pool).await.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to insert files batch: {}",
e
)))
})?;
}
Ok(())
}
pub async fn get_download_files(
&self,
download_id: DownloadId,
) -> Result<Vec<super::DownloadFile>> {
let rows = sqlx::query_as::<_, super::DownloadFile>(
r#"
SELECT id, download_id, file_index, filename, subject, total_segments
FROM download_files
WHERE download_id = ?
ORDER BY file_index ASC
"#,
)
.bind(download_id)
.fetch_all(&self.pool)
.await
.map_err(|e| {
Error::Database(DatabaseError::QueryFailed(format!(
"Failed to get download files: {}",
e
)))
})?;
Ok(rows)
}
}