unobtanium 3.0.0

Opinioated Web search engine library with crawler and viewer companion.
Documentation
use criterium::CriteriumChain;
use criterium::rusqlite::AssembleRusqliteQuery;
use rusqlite::OptionalExtension;
use url::Url;
use uuid::Uuid;
use log::trace;

use crate::criterium::EntityCriterium;
use crate::database::error::SmuggleDatabaseErrorExtension;
use crate::database::id::EntityGenerationId;
use crate::database::id::NumericDatabseId;
use crate::database::id::UrlId;
use crate::database::DatabaseError;
use crate::database::EntityComponentTable;
use crate::database::Page;
use crate::database::sqlite_helper::*;
use crate::database::summary::structs::SummaryDatabase;
use crate::database::summary::structs::SummaryDatabaseTransaction;
use crate::search::FullTextEntityWeights;
use crate::summary::EntityGeneration;
use crate::summary::WithEntityGenerationId;
use crate::time::UtcTimestamp;
use crate::types::Blake2b512Digest;
use crate::url::UrlWithoutFragment;

impl SummaryDatabase {
	
	pub fn get_entity_generation(
		&self,
		entity_generation_id: EntityGenerationId,
	) -> Result<EntityGeneration, DatabaseError> {
		trace!("summary_db.get_entity_generation()");
		return self.connection().query_row(
			"SELECT
				url_id,
				first_seen_unix_utc,
				last_seen_unix_utc,
				confirmed_end_unix_utc,
				marked_duplicate,
				entity_generation_uuid,
				url_fragment
			FROM entity_generation
			WHERE entity_generation_id = ?
			",(entity_generation_id,),
			|row| { Ok(EntityGeneration{
				url: url_with_fragment_from_row(self.base(), row, 0, 6)
					.smuggle_through_rusqlite()?,
				first_seen: from_unix_timestamp_or_epoch(row.get(1)?),
				last_seen: from_unix_timestamp_or_epoch(row.get(2)?),
				time_end_confirmed: from_unix_timestamp_opt(row.get(3)?),
				marked_duplicate: row.get(4)?,
				uuid: row.get(5)?,
			}) }
		).map_err(Into::into);
	}

	pub fn get_entity_generation_id(
		&self,
		entity_generation_uuid: Uuid,
	) -> Result<EntityGenerationId, DatabaseError> {
		trace!("summary_db.get_entity_generation()");
		if let Some(id) = self.cache.entity_generation_id.read().unwrap().get_id(&entity_generation_uuid) {
			return Ok(id);
		}
		let id: EntityGenerationId = self.connection().query_row(
			"SELECT
				entity_generation_id
			FROM entity_generation
			WHERE entity_generation_uuid = ?
			",(entity_generation_uuid,),
			|row| { row.get(0) }
		)?;
		self.cache.entity_generation_id.write().unwrap().push(id, &entity_generation_uuid);
		return Ok(id);
	}

	pub fn get_entity_generation_by_uuid(
		&self,
		uuid: Uuid,
	) -> Result<WithEntityGenerationId<EntityGeneration>, DatabaseError> {
		trace!("summary_db.get_entity_generation_by_uuid()");
		return self.connection().query_row(
			"SELECT
				entity_generation_id,
				url_id,
				first_seen_unix_utc,
				last_seen_unix_utc,
				confirmed_end_unix_utc,
				marked_duplicate,
				url_fragment
			FROM entity_generation
			WHERE entity_generation_uuid = ?
			",(uuid,),
			|row| { Ok(WithEntityGenerationId{
				entity_generation_id: row.get(0)?,
				data: EntityGeneration{
					url: url_with_fragment_from_row(self.base(), row, 1, 6)
						.smuggle_through_rusqlite()?,
					first_seen: from_unix_timestamp_or_epoch(row.get(2)?),
					last_seen: from_unix_timestamp_or_epoch(row.get(3)?),
					time_end_confirmed: from_unix_timestamp_opt(row.get(4)?),
					marked_duplicate: row.get(5)?,
					uuid: uuid,
				},
			}) }
		).map_err(Into::into) ;
	}

	pub fn get_entity_generations(
		&self,
		page: &Page,
		criterium_chain: CriteriumChain<EntityCriterium>,
		weights: Option<FullTextEntityWeights>,
	) -> Result<Vec<EntityGeneration>, DatabaseError> {
		trace!("summary_db.get_entity_generations()");
		let mut query = criterium_chain.assemble_rusqlite_query_for_db(
			&EntityComponentTable::EntityGeneration
		);
		let order_by_clause = if let Some(weights) = weights {
			if query.sql_where_clause.contains("MATCH") {
				// Order by bm25 and give a huge boost to articles
				// This must be tuned a bit in the future
				&("ORDER BY ".to_owned() + &weights.to_bm25_sql() +
				" + CASE document_description.indexiness > 0 "+
				" WHEN TRUE THEN 100 ELSE 0 END")
			} else {
				""
			}
		} else {
			""
		};
		trace!("SQL where: {}", query.sql_where_clause);
		trace!("SQL where values: {:?}", query.where_values);
		trace!("SQL joins: {}", query.joins_to_sql());
		// Note: The subquery makes heavy use of the entity_generation_by_time_started index and will absolutely not scale otherwise.
		let mut get_file_summaries_statement = self.connection().prepare(
			format!("
			SELECT
				entity_generation.entity_generation_uuid,
				entity_generation.url_id,
				entity_generation.url_fragment,
				entity_generation.first_seen_unix_utc,
				entity_generation.last_seen_unix_utc,
				entity_generation.confirmed_end_unix_utc,
				entity_generation.marked_duplicate
			FROM entity_generation
			{}
			WHERE {}
			{} --order by clause
			LIMIT ?
			OFFSET ?",
				query.joins_to_sql(),
				query.sql_where_clause,
				order_by_clause,
			).as_str(),
		)?;
		query.where_values.push(page.limit().into());
		query.where_values.push(page.offset().into());

		return get_file_summaries_statement.query_map(
			query.where_values_as_params(),
			|row| {
				Ok(EntityGeneration {
					uuid: row.get(0)?,
					url: url_with_fragment_from_row(self.base(), row, 1, 2)
						.smuggle_through_rusqlite()?,
					first_seen: from_unix_timestamp_or_epoch(row.get(3)?),
					last_seen: from_unix_timestamp_or_epoch(row.get(4)?),
					time_end_confirmed: from_unix_timestamp_opt(row.get(5)?),
					marked_duplicate: row.get(6)?,
				})
			}
		)?.map(|r| r.map_err(Into::into)).collect();

	}

	pub fn count_entity_generations(
		&self,
		criterium_chain: CriteriumChain<EntityCriterium>
	) -> Result<u64, DatabaseError> {
		trace!("summary_db.count_entity_generations()");
		let query = criterium_chain.assemble_rusqlite_query_for_db(
			&EntityComponentTable::EntityGeneration
		);
		trace!("SQL where: {}", query.sql_where_clause);
		trace!("SQL where values: {:?}", query.where_values);
		trace!("SQL joins: {}", query.joins_to_sql());
		let mut get_file_summaries_statement = self.connection().prepare(
			format!("
			SELECT
				COUNT(*)
			FROM entity_generation
			{}
			WHERE {}
			",
				query.joins_to_sql(),
				query.sql_where_clause,
			).as_str(),
		)?;

		return get_file_summaries_statement
			.query_row(query.where_values_as_params(), |row| { row.get(0) })
			.map_err(Into::into);

	}

	pub fn get_open_entity_generation_uuid_by_content(
		&self,
		url: &Url,
		text_pile_blake2b512_digest: &Blake2b512Digest
	) -> Result<Option<Uuid>, DatabaseError> {
		trace!("summary_db.get_entity_generation_by_content()");
		let mut get_file_summaries_statement = self.connection().prepare("
			SELECT
				entity_generation_uuid
			FROM entity_generation
			INNER JOIN text_pile ON entity_generation.text_pile_id = text_pile.text_pile_id
			WHERE entity_generation.url_id = ?
				AND text_pile.blake2b512_digest = ?
				AND entity_generation.url_fragment is ?
		")?;
		let url_id = self.base().read_url_id(&url.clone().into())
			.smuggle_through_rusqlite().optional()?;
		if let Some(url_id) = url_id {
			let uuid_opt: Option<Uuid> = get_file_summaries_statement.query_row((
				url_id,
				text_pile_blake2b512_digest,
				url.fragment()
			), |row| {
				row.get(0)
			}).optional()?;
			return Ok(uuid_opt);
		}
		
		return Ok(None);
	}

}

///////////////////////////////////////////////////////////////////////////////
// Transaction

impl SummaryDatabaseTransaction<'_> {
	pub fn get_entity_generation_id(
		&self,
		entity_generation_uuid: Uuid,
	) -> Result<EntityGenerationId, DatabaseError> {
		trace!("summary_db_transaction.get_entity_generation()");
		if let Some(id) = self.cache.entity_generation_id.read().unwrap().get_id(&entity_generation_uuid) {
			return Ok(id);
		}
		let mut entity_generation_uuid_to_id_statement = self.connection().prepare_cached("
			SELECT entity_generation_id
			FROM entity_generation
			WHERE entity_generation_uuid = ?
		")?;
		let id: EntityGenerationId = entity_generation_uuid_to_id_statement.query_row(
			(entity_generation_uuid,),
			|row| { row.get(0) }
		)?;
		self.cache.entity_generation_id.write().unwrap().push(id, &entity_generation_uuid);
		return Ok(id);
	}

	pub fn store_entity_generation_bulk(
		&mut self,
		entity_generations: &[EntityGeneration],
	) -> Result<(), DatabaseError> {

		self.base_transaction.assert_writable("store_entity_generation_bulk")?;
		
		trace!("summary_db_transaction.store_entity_generation_bulk()");
		let mut url_ids: Vec<UrlId> = Vec::with_capacity(entity_generations.len());
		for entity_generation in entity_generations {
			url_ids.push(
				self.base_transaction.get_url_id(&entity_generation.url.clone().into(), true)?
			)
		}
		let mut store_entity_generation_statement = self.connection().prepare_cached("
			INSERT INTO entity_generation (
				url_id,
				first_seen_unix_utc,
				last_seen_unix_utc,
				confirmed_end_unix_utc,
				marked_duplicate,
				entity_generation_uuid,
				url_fragment
			) VALUES (
				?,?,?,?,?,?,?
			)
		")?;
		for entity_generation in entity_generations {
			let url_id = Some(self.base_transaction.get_url_id(&entity_generation.url.clone().into(), true)?);
			let url_fragment = entity_generation.url.fragment().map(ToString::to_string);
			store_entity_generation_statement.execute((
				url_id,
				entity_generation.first_seen.timestamp(),
				entity_generation.last_seen.timestamp(),
				entity_generation.time_end_confirmed.map(|t| t.timestamp()),
				entity_generation.marked_duplicate,
				entity_generation.uuid,
				url_fragment,
			))?;
			let id = EntityGenerationId::new(self.connection().last_insert_rowid());
			self.cache.entity_generation_id.write().unwrap().push(id, &entity_generation.uuid);
		}
		return Ok(());
	}

	pub fn close_entity_generation_bulk(
		&mut self,
		url_to_close_at: &[(Url, UtcTimestamp)],
	) -> Result<(), DatabaseError> {
		trace!("summary_db_transaction.close_entity_generation_bulk()");
		let mut url_ids: Vec<Option<UrlId>> = Vec::with_capacity(url_to_close_at.len());
		for (url, _) in url_to_close_at {
			url_ids.push(
				self.base_transaction.read_url_id(&UrlWithoutFragment::new(url.clone())).optional()?
			)
		}
		let mut close_entity_generation_by_url_statement = self.connection().prepare_cached("
			UPDATE entity_generation
			SET confirmed_end_unix_utc = ?
			WHERE entity_generation.url_id = ?
				AND entity_generation.url_fragment is ?
				AND entity_generation.last_seen_unix_utc < ?
		")?;
		for (i, (url, timestamp)) in url_to_close_at.iter().enumerate() {
			if let Some(url_id) = url_ids[i] {
				// This may be called on URLs that aren't in the database yet
				let timestamp_unix_utc  = timestamp.timestamp();
				close_entity_generation_by_url_statement.execute((
					timestamp_unix_utc,
					url_id,
					url.fragment(),
					timestamp_unix_utc
				))?;
			}
		}
		Ok(())
	}

}