use log::trace;
use rusqlite::OptionalExtension;
use uuid::Uuid;
use crate::database::id::EntityGenerationId;
use crate::database::DatabaseError;
use crate::database::sqlite_helper::*;
use crate::database::summary::structs::SummaryDatabase;
use crate::database::summary::structs::SummaryDatabaseTransaction;
use crate::Origin;
use crate::summary::DuplicateSummary;
use crate::summary::ExactDuplicateGroup;
use crate::time::UtcTimestamp;
use std::collections::HashSet;
impl SummaryDatabase {
pub fn get_exact_duplicate_groups(
&self,
candidates: &[ExactDuplicateGroup]
) -> Result<HashSet<ExactDuplicateGroup>, DatabaseError> {
trace!("summary_db.get_exact_duplicate_groups()");
let mut duplicate_groups: HashSet<ExactDuplicateGroup> = HashSet::new();
let mut duplicate_group_fetch_statement = self.connection().prepare("
SELECT blake2b512_digest, MIN(str_url)
FROM text_pile
INNER JOIN entity_generation as eg
ON eg.text_pile_id = text_pile.text_pile_id
INNER JOIN url
ON url.url_id = eg.url_id
WHERE
text_pile.blake2b512_digest = ?
AND url.origin_id = ?
GROUP BY text_pile.blake2b512_digest, url.origin_id
HAVING COUNT(eg.url_id) > 1
")?;
for candidate in candidates {
if let Some(origin) = Origin::from_url(&candidate.first_url) {
let origin_id = self.base().read_origin_id(&origin)?;
let duplicate_group_opt = duplicate_group_fetch_statement
.query_row((
candidate.blake2b512_digest.clone(),
origin_id,
),|row| {
Ok(ExactDuplicateGroup{
blake2b512_digest: row.get(0)?,
first_url: row.get(1)?,
})
}
).optional()?;
if let Some(group) = duplicate_group_opt {
duplicate_groups.insert(group);
}
}
}
return Ok(duplicate_groups);
}
pub fn get_current_duplicate_summary(
&self,
entity_generation_id: EntityGenerationId
) -> Result<Option<DuplicateSummary>, DatabaseError> {
trace!("summary_db.get_current_duplicate_summary()");
return self.connection().query_row(
"SELECT
subject_eg.entity_generation_uuid,
duplicate_of_eg.entity_generation_uuid,
duplicate_status_start_unix_utc,
duplicate_status_end_unix_utc
FROM duplicate_summary
INNER JOIN entity_generation AS subject_eg ON
subject_eg.entity_generation_id =
duplicate_summary.subject_entity_generation_id
INNER JOIN entity_generation AS duplicate_of_eg ON
duplicate_of_eg.entity_generation_id =
duplicate_summary.duplicate_of_entity_generation_id
WHERE duplicate_summary.subject_entity_generation_id = ?
AND duplicate_summary.duplicate_status_end_unix_utc is NULL
",(entity_generation_id,),
|row| { Ok(DuplicateSummary{
subject_entity_generation: row.get(0)?,
duplicate_of_entity_generation: row.get(1)?,
duplicate_status_start: from_unix_timestamp_or_epoch(row.get(2)?),
duplicate_status_end: from_unix_timestamp_opt(row.get(3)?),
}) }
).optional().map_err(Into::into);
}
}
impl SummaryDatabaseTransaction<'_> {
pub fn store_duplicate_summary_bulk(
&mut self,
summaries: &[DuplicateSummary]
) -> Result<(), DatabaseError> {
trace!("summary_db_transaction.store_duplicate_summary_bulk()");
let mut create_duplicate_summary_statement = self.connection().prepare_cached("
INSERT INTO duplicate_summary (
subject_entity_generation_id,
duplicate_of_entity_generation_id,
duplicate_status_start_unix_utc,
duplicate_status_end_unix_utc
) VALUES (
?,?,?,?
)
")?;
let mut close_duplicate_summary_statement = self.connection().prepare_cached("
UPDATE duplicate_summary
SET duplicate_status_end_unix_utc = ?
WHERE subject_entity_generation_id = ? AND
duplicate_status_end_unix_utc is NULL
")?;
let mut test_for_duplicate_summary_statement = self.connection().prepare_cached("
SELECT COUNT(*)
FROM duplicate_summary
WHERE duplicate_status_end_unix_utc is NULL AND
subject_entity_generation_id = ? AND
duplicate_of_entity_generation_id = ?
")?;
let mut set_mark_duplicate_statement = self.connection().prepare_cached("
UPDATE entity_generation
SET marked_duplicate = ?
WHERE entity_generation_id = ?
")?;
for entry in summaries {
let subject_id = self.get_entity_generation_id(entry.subject_entity_generation)?;
let duplicate_of_id = self.get_entity_generation_id(entry.duplicate_of_entity_generation)?;
let test_res: i64 = test_for_duplicate_summary_statement.query_row((subject_id, duplicate_of_id), |row| { row.get(0) } )?;
if test_res == 0 {
close_duplicate_summary_statement.execute((
entry.duplicate_status_start.timestamp(),
subject_id
))?;
create_duplicate_summary_statement.execute((
subject_id,
duplicate_of_id,
entry.duplicate_status_start.timestamp(),
to_unix_timestamp_opt(entry.duplicate_status_end)
))?;
set_mark_duplicate_statement.execute((entry.duplicate_status_end.is_none(), subject_id))?;
} else {
if let Some(end_time) = entry.duplicate_status_end {
close_duplicate_summary_statement.execute((
end_time.timestamp(),
subject_id
))?;
set_mark_duplicate_statement.execute((false, subject_id))?;
}
}
}
Ok(())
}
pub fn close_duplicate_summary_bulk(
&mut self,
entity_generations: &[Uuid],
time_closed: UtcTimestamp,
) -> Result<(), DatabaseError> {
trace!("summary_db_transaction.close_duplicate_summary_bulk()");
let mut close_duplicate_summary_statement = self.connection().prepare_cached("
UPDATE duplicate_summary
SET duplicate_status_end_unix_utc = ?
WHERE subject_entity_generation_id = ? AND
duplicate_status_end_unix_utc is NULL
")?;
let mut mark_as_not_duplicate_statement = self.connection().prepare_cached("
UPDATE entity_generation
SET marked_duplicate = 0
WHERE entity_generation_id = ?
")?;
for entity_generation_uuid in entity_generations {
let subject_id = self.get_entity_generation_id(*entity_generation_uuid)?;
close_duplicate_summary_statement.execute((
subject_id,
time_closed.timestamp()
))?;
mark_as_not_duplicate_statement.execute((subject_id,))?;
}
Ok(())
}
}