use criterium::CriteriumChain;
use criterium::rusqlite::AssembleRusqliteQuery;
use log::trace;
use rusqlite::params_from_iter;
use url::Url;
use uuid::Uuid;
use std::collections::HashMap;
use crate::crawling::CrawlLogEntry;
use crate::crawling::CrawlType;
use crate::crawling::ExitCode;
use crate::criterium::CrawlLogEntryCriterium;
use crate::database::crawler::structs::CrawlerDatabase;
use crate::database::DatabaseError;
use crate::database::error::SmuggleDatabaseErrorExtension;
use crate::database::id::AgentId;
use crate::database::id::CrawlLogEntryId;
use crate::database::id::NumericDatabseId;
use crate::database::id::UrlId;
use crate::database::IntoWithNumericId;
use crate::database::Page;
use crate::database::sqlite_helper::*;
use crate::database::WithNumericId;
use super::structs::CrawlerDatabaseTransaction;
impl CrawlerDatabase {
pub fn add_to_crawl_log(
&mut self,
entry: CrawlLogEntry
) -> Result<CrawlLogEntryId, DatabaseError> {
self.base().assert_writable("add_to_crawl_log")?;
trace!("crawler_db.add_to_crawl_log()");
let url_id = self.base_mut().get_or_add_url_id(&entry.url)?;
self.connection().execute(
"INSERT INTO crawl_log (
agent_id,
url_id,
crawl_type,
crawl_uuid,
time_started_unix_utc,
time_taken_ms,
exit_code,
message
) VALUES (
?,?,?,?, ?,?,?,?
)
", (
entry.agent_id,
url_id,
entry.crawl_type.to_number(),
entry.crawl_uuid,
entry.time_started.timestamp(),
entry.time_taken_ms,
entry.exit_code.to_number(),
entry.message,
)
)?;
return Ok(CrawlLogEntryId::new(self.connection().last_insert_rowid()));
}
pub fn get_crawl_log_entry(
&self,
id: CrawlLogEntryId
) -> Result<CrawlLogEntry, DatabaseError> {
trace!("crawler_db.get_crawl_log_entry()");
return self.connection().query_row(
"SELECT
agent_id,
url_id,
crawl_type,
crawl_uuid,
time_started_unix_utc,
time_taken_ms,
exit_code,
message
FROM crawl_log
WHERE crawl_log_id = ?
", (id,),
|row| { Ok(CrawlLogEntry{
agent_id: row.get(0)?,
url: self.base().get_url_by_id(row.get(1)?)
.smuggle_through_rusqlite()?,
crawl_type: CrawlType::from_number(row.get(2)?),
crawl_uuid: row.get(3)?,
time_started: from_unix_timestamp_or_epoch(row.get(4)?),
time_taken_ms: row.get(5)?,
exit_code: ExitCode::from_number(row.get(6)?),
message: row.get(7)?,
}) }
).map_err(Into::into);
}
pub fn get_crawl_log_entry_by_uuid(
&self,
uuid: Uuid
) -> Result<CrawlLogEntry, DatabaseError> {
trace!("crawler_db.get_crawl_log_entry()");
return self.connection().query_row(
"SELECT
agent_id,
url_id,
crawl_type,
crawl_uuid,
time_started_unix_utc,
time_taken_ms,
exit_code,
message
FROM crawl_log
WHERE crawl_uuid = ?
", (uuid,),
|row| { Ok(CrawlLogEntry{
agent_id: row.get(0)?,
url: self.base().get_url_by_id(row.get(1)?)
.smuggle_through_rusqlite()?,
crawl_type: CrawlType::from_number(row.get(2)?),
crawl_uuid: row.get(3)?,
time_started: from_unix_timestamp_or_epoch(row.get(4)?),
time_taken_ms: row.get(5)?,
exit_code: ExitCode::from_number(row.get(6)?),
message: row.get(7)?,
}) }
).map_err(Into::into);
}
pub fn get_crawl_log_entry_bulk(
&self,
ids: &[CrawlLogEntryId]
) -> Result<HashMap<CrawlLogEntryId,CrawlLogEntry>, DatabaseError> {
trace!("crawler_db.get_crawl_log_entry_bulk()");
let mut get_crawl_log_entry_statement = self.connection().prepare_cached("
SELECT
agent_id,
url_id,
crawl_type,
crawl_uuid,
time_started_unix_utc,
time_taken_ms,
exit_code,
message
FROM crawl_log
WHERE crawl_log_id = ?
")?;
let mut results = HashMap::with_capacity(ids.len());
for id in ids {
results.insert(*id, get_crawl_log_entry_statement.query_row(
(id,), |row| {
Ok(CrawlLogEntry{
agent_id: AgentId::new(row.get(0)?),
url: self.base().get_url_by_id(row.get(1)?)
.smuggle_through_rusqlite()?,
crawl_type: CrawlType::from_number(row.get(2)?),
crawl_uuid: row.get(3)?,
time_started: from_unix_timestamp_or_epoch(row.get(4)?),
time_taken_ms: row.get(5)?,
exit_code: ExitCode::from_number(row.get(6)?),
message: row.get(7)?,
})
})?);
}
return Ok(results);
}
pub fn get_last_crawl_exit_codes_for_url(
&self, url_id: UrlId, page: &Page,
) -> Result<Vec<ExitCode>, DatabaseError> {
trace!("crawler_db.get_last_crawl_exit_codes_for_url()");
let mut selector = self.connection().prepare_cached(
"SELECT
exit_code
FROM crawl_log
WHERE url_id = ? AND NOT (exit_code is null)
ORDER BY time_started_unix_utc DESC
LIMIT ?
OFFSET ?
")?;
return selector.query_map(
(url_id, page.limit(), page.offset()),
|row| { Ok(ExitCode::from_number(row.get(0)?)) }
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn get_crawl_log_entries(
&self,
page: &Page,
criterium_chain: CriteriumChain<CrawlLogEntryCriterium>
) -> Result<Vec<WithNumericId<CrawlLogEntry, CrawlLogEntryId>>, DatabaseError> {
trace!("crawler_db.get_crawl_log_entries()");
let mut query = criterium_chain.assemble_rusqlite_query_for_db(
&()
);
trace!("SQL where: {}", query.sql_where_clause);
trace!("SQL where values: {:?}", query.where_values);
trace!("SQL joins: {}", query.joins_to_sql());
let mut get_crawl_log_entries_statement = self.connection().prepare(
format!("
SELECT
crawl_log.agent_id,
crawl_log.url_id,
crawl_log.crawl_type,
crawl_log.crawl_uuid,
crawl_log.time_started_unix_utc,
crawl_log.time_taken_ms,
crawl_log.exit_code,
crawl_log.message,
crawl_log.crawl_log_id
FROM crawl_log
{}
WHERE {}
LIMIT ?
OFFSET ?",
query.joins_to_sql(),
query.sql_where_clause,
).as_str()
)?;
query.where_values.push(page.limit().into());
query.where_values.push(page.offset().into());
return get_crawl_log_entries_statement.query_map(
query.where_values_as_params(),
|row| {
Ok(CrawlLogEntry{
agent_id: AgentId::new(row.get(0)?),
url: self.base().get_url_by_id(row.get(1)?)
.smuggle_through_rusqlite()?,
crawl_type: CrawlType::from_number(row.get(2)?),
crawl_uuid: row.get(3)?,
time_started: from_unix_timestamp_or_epoch(row.get(4)?),
time_taken_ms: row.get(5)?,
exit_code: ExitCode::from_number(row.get(6)?),
message: row.get(7)?,
}.with_numeric_id(CrawlLogEntryId::new(row.get(8)?)))
}
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn get_urls_with_more_than_n_crawl_log_entries(
&self,
page: &Page,
n: i64,
) -> Result<Vec<Url>, DatabaseError> {
trace!("crawler_db.get_urls_with_more_than_n_crawl_log_entries()");
let select_url_statement = self.connection().prepare("
SELECT url.str_url
FROM crawl_log
INNER JOIN url ON crawl_log.url_id = url.url_id
GROUP BY crawl_log.url_id
HAVING COUNT(*) > ?
LIMIT ?
OFFSET ?
");
return select_url_statement?.query_map(
(n, page.limit(), page.offset()),
|row| {
row.get::<usize, Url>(0)
}
)?.map(|r| r.map_err(Into::into)).collect();
}
}
impl CrawlerDatabaseTransaction<'_> {
pub fn delete_crawl_log_entries_by_id(
&self,
crawl_log_entry_ids: &[CrawlLogEntryId]
) -> Result<(), DatabaseError> {
trace!("crawler_transaction.delete_crawl_log_entries_by_id()");
self.chunked_crawl_log_execute("
DELETE
FROM file_text
WHERE file_text.file_id IN (
SELECT file_id
FROM file
WHERE file.crawl_log_id IN (?)
)
", crawl_log_entry_ids)?;
self.chunked_crawl_log_execute("
DELETE
FROM file
WHERE file.crawl_log_id IN (?)
", crawl_log_entry_ids)?;
self.chunked_crawl_log_execute("
DELETE
FROM redirect
WHERE redirect.crawl_log_id IN (?)
", crawl_log_entry_ids)?;
self.chunked_crawl_log_execute("
DELETE
FROM request
WHERE request.crawl_log_id IN (?)
", crawl_log_entry_ids)?;
self.chunked_crawl_log_execute("
DELETE
FROM crawl_log
WHERE crawl_log.crawl_log_id IN (?)
", crawl_log_entry_ids)?;
Ok(())
}
fn chunked_crawl_log_execute(
&self,
base_query: &str,
crawl_log_entry_ids: &[CrawlLogEntryId]
) -> Result<(), DatabaseError> {
let mut i: usize = 0;
let mut x256_query = self.connection().prepare_cached(
&base_query.replace("?", &("?".to_owned()+&(",?".repeat(255))))
)?;
while crawl_log_entry_ids.len() - i >= 256 {
x256_query.execute(params_from_iter(crawl_log_entry_ids[i..(i+256)].iter()))?;
i += 256;
}
if i == crawl_log_entry_ids.len() {
return Ok(());
}
let mut remaining_query = self.connection().prepare(
&base_query.replace("?", &("?".to_owned()+&(",?".repeat(crawl_log_entry_ids.len() - i - 1))))
)?;
remaining_query.execute(params_from_iter(crawl_log_entry_ids[i..].iter()))?;
Ok(())
}
}