unobtanium 3.0.0

Opinioated Web search engine library with crawler and viewer companion.
Documentation
use criterium::CriteriumChain;
use criterium::rusqlite::AssembleRusqliteQuery;
use log::trace;
use rusqlite::params_from_iter;
use url::Url;
use uuid::Uuid;

use std::collections::HashMap;

use crate::crawling::CrawlLogEntry;
use crate::crawling::CrawlType;
use crate::crawling::ExitCode;
use crate::criterium::CrawlLogEntryCriterium;
use crate::database::crawler::structs::CrawlerDatabase;
use crate::database::DatabaseError;
use crate::database::error::SmuggleDatabaseErrorExtension;
use crate::database::id::AgentId;
use crate::database::id::CrawlLogEntryId;
use crate::database::id::NumericDatabseId;
use crate::database::id::UrlId;
use crate::database::IntoWithNumericId;
use crate::database::Page;
use crate::database::sqlite_helper::*;
use crate::database::WithNumericId;

use super::structs::CrawlerDatabaseTransaction;

impl CrawlerDatabase {
	pub fn add_to_crawl_log(
		&mut self,
		entry: CrawlLogEntry
	) -> Result<CrawlLogEntryId, DatabaseError> {
		self.base().assert_writable("add_to_crawl_log")?;
		trace!("crawler_db.add_to_crawl_log()");
		let url_id = self.base_mut().get_or_add_url_id(&entry.url)?;
		self.connection().execute(
			"INSERT INTO crawl_log (
				agent_id,
				url_id,
				crawl_type,
				crawl_uuid,
				time_started_unix_utc,
				time_taken_ms,
				exit_code,
				message
			) VALUES (
				?,?,?,?, ?,?,?,?
			)
			", (
				entry.agent_id,
				url_id,
				entry.crawl_type.to_number(),
				entry.crawl_uuid,
				entry.time_started.timestamp(),
				entry.time_taken_ms,
				entry.exit_code.to_number(),
				entry.message,
			)
		)?;
		return Ok(CrawlLogEntryId::new(self.connection().last_insert_rowid()));
	}

	pub fn get_crawl_log_entry(
		&self,
		id: CrawlLogEntryId
	) -> Result<CrawlLogEntry, DatabaseError> {
		trace!("crawler_db.get_crawl_log_entry()");
		return self.connection().query_row(
			"SELECT
				agent_id,
				url_id,
				crawl_type,
				crawl_uuid,
				time_started_unix_utc,
				time_taken_ms,
				exit_code,
				message
			FROM crawl_log
			WHERE crawl_log_id = ?
			", (id,),
			|row| { Ok(CrawlLogEntry{
				agent_id: row.get(0)?,
				url: self.base().get_url_by_id(row.get(1)?)
					.smuggle_through_rusqlite()?,
				crawl_type: CrawlType::from_number(row.get(2)?),
				crawl_uuid: row.get(3)?,
				time_started: from_unix_timestamp_or_epoch(row.get(4)?),
				time_taken_ms: row.get(5)?,
				exit_code: ExitCode::from_number(row.get(6)?),
				message: row.get(7)?,
			}) }
		).map_err(Into::into);

	}

	pub fn get_crawl_log_entry_by_uuid(
		&self,
		uuid: Uuid
	) -> Result<CrawlLogEntry, DatabaseError> {
		trace!("crawler_db.get_crawl_log_entry()");
		return self.connection().query_row(
			"SELECT
				agent_id,
				url_id,
				crawl_type,
				crawl_uuid,
				time_started_unix_utc,
				time_taken_ms,
				exit_code,
				message
			FROM crawl_log
			WHERE crawl_uuid = ?
			", (uuid,),
			|row| { Ok(CrawlLogEntry{
				agent_id: row.get(0)?,
				url: self.base().get_url_by_id(row.get(1)?)
					.smuggle_through_rusqlite()?,
				crawl_type: CrawlType::from_number(row.get(2)?),
				crawl_uuid: row.get(3)?,
				time_started: from_unix_timestamp_or_epoch(row.get(4)?),
				time_taken_ms: row.get(5)?,
				exit_code: ExitCode::from_number(row.get(6)?),
				message: row.get(7)?,
			}) }
		).map_err(Into::into);

	}


	pub fn get_crawl_log_entry_bulk(
		&self,
		ids: &[CrawlLogEntryId]
	) -> Result<HashMap<CrawlLogEntryId,CrawlLogEntry>, DatabaseError> {
		trace!("crawler_db.get_crawl_log_entry_bulk()");
		let mut get_crawl_log_entry_statement = self.connection().prepare_cached("
			SELECT
				agent_id,
				url_id,
				crawl_type,
				crawl_uuid,
				time_started_unix_utc,
				time_taken_ms,
				exit_code,
				message
			FROM crawl_log
			WHERE crawl_log_id = ?
		")?;
		let mut results = HashMap::with_capacity(ids.len());
		for id in ids {
			results.insert(*id, get_crawl_log_entry_statement.query_row(
				(id,), |row| {
				Ok(CrawlLogEntry{
					agent_id: AgentId::new(row.get(0)?),
					url: self.base().get_url_by_id(row.get(1)?)
						.smuggle_through_rusqlite()?,
					crawl_type: CrawlType::from_number(row.get(2)?),
					crawl_uuid: row.get(3)?,
					time_started: from_unix_timestamp_or_epoch(row.get(4)?),
					time_taken_ms: row.get(5)?,
					exit_code: ExitCode::from_number(row.get(6)?),
					message: row.get(7)?,
				})
			})?);
		}
		return Ok(results);
	}

	pub fn get_last_crawl_exit_codes_for_url(
		&self, url_id: UrlId, page: &Page,
	) -> Result<Vec<ExitCode>, DatabaseError> {
		trace!("crawler_db.get_last_crawl_exit_codes_for_url()");
		let mut selector = self.connection().prepare_cached(
			"SELECT
				exit_code
			FROM crawl_log
			WHERE url_id = ? AND NOT (exit_code is null)
			ORDER BY time_started_unix_utc DESC
			LIMIT ?
			OFFSET ?
		")?;
		return selector.query_map(
			(url_id, page.limit(), page.offset()),
			|row| { Ok(ExitCode::from_number(row.get(0)?)) }
		)?.map(|r| r.map_err(Into::into)).collect();

	}

	pub fn get_crawl_log_entries(
		&self,
		page: &Page,
		criterium_chain: CriteriumChain<CrawlLogEntryCriterium>
	) -> Result<Vec<WithNumericId<CrawlLogEntry, CrawlLogEntryId>>, DatabaseError> {
		trace!("crawler_db.get_crawl_log_entries()");
		let mut query = criterium_chain.assemble_rusqlite_query_for_db(
			&()
		);
		trace!("SQL where: {}", query.sql_where_clause);
		trace!("SQL where values: {:?}", query.where_values);
		trace!("SQL joins: {}", query.joins_to_sql());
		let mut get_crawl_log_entries_statement = self.connection().prepare(
			format!("
			SELECT
				crawl_log.agent_id,
				crawl_log.url_id,
				crawl_log.crawl_type,
				crawl_log.crawl_uuid,
				crawl_log.time_started_unix_utc,
				crawl_log.time_taken_ms,
				crawl_log.exit_code,
				crawl_log.message,
				crawl_log.crawl_log_id
			FROM crawl_log
			{}
			WHERE {}
			LIMIT ?
			OFFSET ?",
				query.joins_to_sql(),
				query.sql_where_clause,
			).as_str()
		)?;
		query.where_values.push(page.limit().into());
		query.where_values.push(page.offset().into());
		return get_crawl_log_entries_statement.query_map(
			query.where_values_as_params(),
			|row| {
				Ok(CrawlLogEntry{
					agent_id: AgentId::new(row.get(0)?),
					url: self.base().get_url_by_id(row.get(1)?)
						.smuggle_through_rusqlite()?,
					crawl_type: CrawlType::from_number(row.get(2)?),
					crawl_uuid: row.get(3)?,
					time_started: from_unix_timestamp_or_epoch(row.get(4)?),
					time_taken_ms: row.get(5)?,
					exit_code: ExitCode::from_number(row.get(6)?),
					message: row.get(7)?,
				}.with_numeric_id(CrawlLogEntryId::new(row.get(8)?)))
			}
		)?.map(|r| r.map_err(Into::into)).collect();
		
	}

	pub fn get_urls_with_more_than_n_crawl_log_entries(
		&self,
		page: &Page,
		n: i64,
	) -> Result<Vec<Url>, DatabaseError> {
		trace!("crawler_db.get_urls_with_more_than_n_crawl_log_entries()");
		let select_url_statement = self.connection().prepare("
		SELECT url.str_url
			FROM crawl_log
			INNER JOIN url ON crawl_log.url_id = url.url_id
			GROUP BY crawl_log.url_id
			HAVING COUNT(*) > ?
			LIMIT ?
			OFFSET ?
		");
		return select_url_statement?.query_map(
			(n, page.limit(), page.offset()),
			|row| {
				row.get::<usize, Url>(0)
			}
		)?.map(|r| r.map_err(Into::into)).collect();
	}

}

impl CrawlerDatabaseTransaction<'_> {

	pub fn delete_crawl_log_entries_by_id(
		&self,
		crawl_log_entry_ids: &[CrawlLogEntryId]
	) -> Result<(), DatabaseError> {
		trace!("crawler_transaction.delete_crawl_log_entries_by_id()");
		self.chunked_crawl_log_execute("
		DELETE
			FROM file_text
			WHERE file_text.file_id IN (
				SELECT file_id
				FROM file
				WHERE file.crawl_log_id IN (?)
			)
		", crawl_log_entry_ids)?;
		self.chunked_crawl_log_execute("
		DELETE
			FROM file
			WHERE file.crawl_log_id IN (?)
		", crawl_log_entry_ids)?;
		self.chunked_crawl_log_execute("
		DELETE
			FROM redirect
			WHERE redirect.crawl_log_id IN (?)
		", crawl_log_entry_ids)?;
		self.chunked_crawl_log_execute("
		DELETE
			FROM request
			WHERE request.crawl_log_id IN (?)
		", crawl_log_entry_ids)?;
		self.chunked_crawl_log_execute("
		DELETE
			FROM crawl_log
			WHERE crawl_log.crawl_log_id IN (?)
		", crawl_log_entry_ids)?;
		Ok(())
	}

	fn chunked_crawl_log_execute(
		&self,
		base_query: &str,
		crawl_log_entry_ids: &[CrawlLogEntryId]
	) -> Result<(), DatabaseError> {
		let mut i: usize = 0;
		let mut x256_query = self.connection().prepare_cached(
			&base_query.replace("?", &("?".to_owned()+&(",?".repeat(255))))
		)?;
		while crawl_log_entry_ids.len() - i >= 256 {
			x256_query.execute(params_from_iter(crawl_log_entry_ids[i..(i+256)].iter()))?;
			i += 256;
		}
		if i == crawl_log_entry_ids.len() {
			return Ok(());
		}
		let mut remaining_query = self.connection().prepare(
			&base_query.replace("?", &("?".to_owned()+&(",?".repeat(crawl_log_entry_ids.len() - i - 1))))
		)?;
		remaining_query.execute(params_from_iter(crawl_log_entry_ids[i..].iter()))?;
		Ok(())
	}


}