unobtanium 3.0.0

Opinioated Web search engine library with crawler and viewer companion.
Documentation
use log::trace;
use rusqlite::OptionalExtension;
use uuid::Uuid;

use crate::database::id::EntityGenerationId;
use crate::database::DatabaseError;
use crate::database::sqlite_helper::*;
use crate::database::summary::structs::SummaryDatabase;
use crate::database::summary::structs::SummaryDatabaseTransaction;
use crate::Origin;
use crate::summary::DuplicateSummary;
use crate::summary::ExactDuplicateGroup;
use crate::time::UtcTimestamp;

use std::collections::HashSet;

impl SummaryDatabase {
	//////////////////////////////////////
	// Duplicate Groups
	pub fn get_exact_duplicate_groups(
		&self,
		candidates: &[ExactDuplicateGroup]
	) -> Result<HashSet<ExactDuplicateGroup>, DatabaseError> {
		trace!("summary_db.get_exact_duplicate_groups()");
		let mut duplicate_groups: HashSet<ExactDuplicateGroup> = HashSet::new();
		let mut duplicate_group_fetch_statement = self.connection().prepare("
			SELECT blake2b512_digest, MIN(str_url)
			FROM text_pile
			INNER JOIN entity_generation as eg
				ON eg.text_pile_id = text_pile.text_pile_id
			INNER JOIN url
				ON url.url_id = eg.url_id
			WHERE
				text_pile.blake2b512_digest = ?
				AND url.origin_id = ?
			GROUP BY text_pile.blake2b512_digest, url.origin_id
			HAVING COUNT(eg.url_id) > 1
		")?;
		for candidate in candidates {
			if let Some(origin) = Origin::from_url(&candidate.first_url) {
				let origin_id = self.base().read_origin_id(&origin)?;
				let duplicate_group_opt = duplicate_group_fetch_statement
					.query_row((
							candidate.blake2b512_digest.clone(),
							origin_id,
						),|row| {
							Ok(ExactDuplicateGroup{
								blake2b512_digest: row.get(0)?,
								first_url: row.get(1)?,
							})
						}
					).optional()?;
				if let Some(group) = duplicate_group_opt {
					duplicate_groups.insert(group);
				}
			}
		}
		return Ok(duplicate_groups);
	}

	//////////////////////////////////////
	// Duplicate Summary

	pub fn get_current_duplicate_summary(
		&self,
		entity_generation_id: EntityGenerationId
	) -> Result<Option<DuplicateSummary>, DatabaseError> {
		trace!("summary_db.get_current_duplicate_summary()");
		return self.connection().query_row(
			"SELECT
				subject_eg.entity_generation_uuid,
				duplicate_of_eg.entity_generation_uuid,
				duplicate_status_start_unix_utc,
				duplicate_status_end_unix_utc
			FROM duplicate_summary
			INNER JOIN entity_generation AS subject_eg ON
				subject_eg.entity_generation_id =
					duplicate_summary.subject_entity_generation_id
			INNER JOIN entity_generation AS duplicate_of_eg ON
				duplicate_of_eg.entity_generation_id =
					duplicate_summary.duplicate_of_entity_generation_id
			WHERE duplicate_summary.subject_entity_generation_id = ?
				AND duplicate_summary.duplicate_status_end_unix_utc is NULL
			",(entity_generation_id,),
			|row| { Ok(DuplicateSummary{
				subject_entity_generation: row.get(0)?,
				duplicate_of_entity_generation: row.get(1)?,
				duplicate_status_start: from_unix_timestamp_or_epoch(row.get(2)?),
				duplicate_status_end: from_unix_timestamp_opt(row.get(3)?),
			}) }
		).optional().map_err(Into::into);
	}

}

impl SummaryDatabaseTransaction<'_> {

	/// Stores new or updated duplicate summaries,
	/// closing previous versions when appropriate.
	pub fn store_duplicate_summary_bulk(
		&mut self,
		summaries: &[DuplicateSummary]
	) -> Result<(), DatabaseError> {
		trace!("summary_db_transaction.store_duplicate_summary_bulk()");
		let mut create_duplicate_summary_statement = self.connection().prepare_cached("
			INSERT INTO duplicate_summary (
				subject_entity_generation_id,
				duplicate_of_entity_generation_id,
				duplicate_status_start_unix_utc,
				duplicate_status_end_unix_utc
			) VALUES (
				?,?,?,?
			)
		")?;
		let mut close_duplicate_summary_statement = self.connection().prepare_cached("
			UPDATE duplicate_summary
			SET duplicate_status_end_unix_utc = ?
			WHERE subject_entity_generation_id = ? AND
				duplicate_status_end_unix_utc is NULL
		")?;
		// This is used to detect if there is already a database entry,
		// for a given entity generation, that is:
		// * open
		// * has the given subject
		// * is a duplicate of the given entity generation
		let mut test_for_duplicate_summary_statement = self.connection().prepare_cached("
			SELECT COUNT(*)
			FROM duplicate_summary
			WHERE duplicate_status_end_unix_utc is NULL AND
				subject_entity_generation_id = ? AND
				duplicate_of_entity_generation_id = ?
		")?;
		let mut set_mark_duplicate_statement = self.connection().prepare_cached("
			UPDATE entity_generation
			SET marked_duplicate = ?
			WHERE entity_generation_id = ?
		")?;
		for entry in summaries {
			let subject_id = self.get_entity_generation_id(entry.subject_entity_generation)?;
			let duplicate_of_id = self.get_entity_generation_id(entry.duplicate_of_entity_generation)?;
			let test_res: i64 = test_for_duplicate_summary_statement.query_row((subject_id, duplicate_of_id), |row| { row.get(0) } )?;
			if test_res == 0 {
				// There is not already an open record tht matches the one we want to register.
				// Close all records that might be open.
				close_duplicate_summary_statement.execute((
					entry.duplicate_status_start.timestamp(),
					subject_id
				))?;
				// And reopen the one we want
				create_duplicate_summary_statement.execute((
					subject_id,
					duplicate_of_id,
					entry.duplicate_status_start.timestamp(),
					to_unix_timestamp_opt(entry.duplicate_status_end)
				))?;
				set_mark_duplicate_statement.execute((entry.duplicate_status_end.is_none(), subject_id))?;
			} else {
				// If an end time is set for an already existing, but open
				// entry close it immedeately.
				if let Some(end_time) = entry.duplicate_status_end {
					close_duplicate_summary_statement.execute((
						end_time.timestamp(),
						subject_id
					))?; 
					set_mark_duplicate_statement.execute((false, subject_id))?;
				}
			}
		}
		Ok(())
	}

	/// Takes EntityGeneration UUIDs and closes their open duplicate-summaries.
	pub fn close_duplicate_summary_bulk(
		&mut self,
		entity_generations: &[Uuid],
		time_closed: UtcTimestamp,
	) -> Result<(), DatabaseError> {
		trace!("summary_db_transaction.close_duplicate_summary_bulk()");
		let mut close_duplicate_summary_statement = self.connection().prepare_cached("
			UPDATE duplicate_summary
			SET duplicate_status_end_unix_utc = ?
			WHERE subject_entity_generation_id = ? AND
				duplicate_status_end_unix_utc is NULL
		")?;
		let mut mark_as_not_duplicate_statement = self.connection().prepare_cached("
			UPDATE entity_generation
			SET marked_duplicate = 0
			WHERE entity_generation_id = ?
		")?;
		for entity_generation_uuid in entity_generations {
			let subject_id = self.get_entity_generation_id(*entity_generation_uuid)?;
			close_duplicate_summary_statement.execute((
				subject_id,
				time_closed.timestamp()
			))?;
			mark_as_not_duplicate_statement.execute((subject_id,))?;
		}
		Ok(())
	}


}