use fxhash::{FxHashMap as HashMap, FxHashSet as HashSet};
use std::time::{Duration, Instant};
use crate::comments::{filter_rules, find_comment_action};
use crate::error::ContextualError;
use chrono::{DateTime, Utc};
use itertools::Itertools;
use postgres::types::Oid;
use postgres::Transaction;
use crate::hint_data::HintId;
use crate::hints;
use crate::output::output_format::Hint;
use crate::pg_types::locks::{Lock, LockableTarget};
use crate::tracing::queries;
use crate::tracing::queries::{
ColumnIdentifier, ColumnMetadata, Constraint, ForeignKeyReference, RelfileId,
};
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct SqlStatementTrace {
pub(crate) sql: String,
pub(crate) locks_taken: Vec<Lock>,
pub(crate) start_time: Instant,
pub(crate) duration: Duration,
pub(crate) added_columns: Vec<(ColumnIdentifier, ColumnMetadata)>,
pub(crate) modified_columns: Vec<(ColumnIdentifier, ModifiedColumn)>,
pub(crate) added_constraints: Vec<Constraint>,
pub(crate) modified_constraints: Vec<(Oid, ModifiedConstraint)>,
pub(crate) created_objects: Vec<LockableTarget>,
pub(crate) lock_timeout_millis: u64,
pub(crate) rewritten_objects: Vec<RelfileId>,
pub(crate) line_no: usize,
pub(crate) fks_missing_index: Vec<ForeignKeyReference>,
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct ModifiedColumn {
pub(crate) old: ColumnMetadata,
pub(crate) new: ColumnMetadata,
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct ModifiedConstraint {
pub(crate) old: Constraint,
pub(crate) new: Constraint,
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct TxLockTracer<'a> {
pub(crate) name: Option<String>,
pub(crate) initial_objects: HashSet<Oid>,
pub(crate) statements: Vec<SqlStatementTrace>,
pub(crate) triggered_hints: Vec<Vec<Hint>>,
pub(crate) all_locks: HashSet<Lock>,
pub(crate) trace_start: DateTime<Utc>,
pub(crate) columns: HashMap<ColumnIdentifier, ColumnMetadata>,
pub(crate) constraints: HashMap<Oid, Constraint>,
pub(crate) concurrent: bool,
pub(crate) created_objects: HashSet<Oid>,
pub(crate) relfile_ids: HashMap<Oid, u32>,
pub(crate) ignored_hints: &'a [&'a str],
}
pub struct StatementCtx<'a> {
pub(crate) sql_statement_trace: &'a SqlStatementTrace,
pub(crate) transaction: &'a TxLockTracer<'a>,
}
impl StatementCtx<'_> {
pub fn new_constraints(&self) -> impl Iterator<Item = &Constraint> {
self.sql_statement_trace.added_constraints.iter()
}
pub fn altered_columns(&self) -> impl Iterator<Item = &(ColumnIdentifier, ModifiedColumn)> {
self.sql_statement_trace.modified_columns.iter()
}
pub fn new_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.sql_statement_trace
.added_columns
.iter()
.map(|(_, col)| col)
}
pub fn locks_at_start(&self) -> impl Iterator<Item = &Lock> {
self.transaction.all_locks.iter()
}
pub fn new_locks_taken(&self) -> impl Iterator<Item = &Lock> {
self.sql_statement_trace.locks_taken.iter()
}
pub fn new_objects(&self) -> impl Iterator<Item = &LockableTarget> {
self.sql_statement_trace.created_objects.iter()
}
pub fn lock_timeout_millis(&self) -> u64 {
self.sql_statement_trace.lock_timeout_millis
}
pub fn constraints_on(&self, oid: Oid) -> impl Iterator<Item = &Constraint> {
self.transaction
.constraints
.values()
.filter(move |con| con.target == oid)
}
pub fn rewritten_objects(&self) -> impl Iterator<Item = &RelfileId> {
self.sql_statement_trace.rewritten_objects.iter()
}
}
impl<'a> TxLockTracer<'a> {
pub fn success(&self) -> bool {
self.triggered_hints.iter().all(|hints| hints.is_empty())
}
pub fn trace_sql_statement(
&mut self,
tx: &mut Transaction,
sql: (usize, &str),
skip_this: bool,
final_checks: bool,
) -> crate::Result<()> {
let start_time = Instant::now();
let oid_vec = self.initial_objects.iter().copied().collect_vec();
let lock_timeout = queries::get_lock_timeout(tx)?;
if !skip_this {
tx.execute(sql.1, &[]).map_err(|err| {
let context = format!("Error while executing SQL statement: {err:?}: {}", sql.1);
err.with_context(context)
})?;
}
let duration = start_time.elapsed();
let locks_taken =
queries::find_relevant_locks_in_current_transaction(tx, &self.initial_objects)?;
let new_locks = queries::find_new_locks(&self.all_locks, &locks_taken);
let relfile_ids = queries::fetch_all_rel_file_ids(tx, &oid_vec)?;
let changed_ids: Vec<_> = relfile_ids
.into_iter()
.filter(|(oid, id)| self.relfile_ids.get(oid) != Some(&id.relfilenode))
.map(|(_, id)| id)
.collect();
self.relfile_ids
.extend(changed_ids.iter().map(|id| (id.oid, id.relfilenode)));
let columns = queries::fetch_all_columns(tx, &oid_vec)?;
let mut added_columns = Vec::new();
let mut modified_columns = Vec::new();
for (col_id, col) in columns.iter() {
if let Some(pre_existing) = self.columns.get(col_id) {
if pre_existing != col {
modified_columns.push((
*col_id,
ModifiedColumn {
new: col.clone(),
old: pre_existing.clone(),
},
));
}
} else {
added_columns.push((*col_id, col.clone()));
}
}
self.columns = columns;
let constraints = queries::fetch_constraints(tx, &oid_vec)?;
let mut added_constraints = Vec::new();
let mut modified_constraints = Vec::new();
for (conid, con) in constraints.iter() {
if let Some(pre_existing) = self.constraints.get(conid) {
if pre_existing != con {
modified_constraints.push((
*conid,
ModifiedConstraint {
old: pre_existing.clone(),
new: con.clone(),
},
));
}
} else {
added_constraints.push(con.clone());
}
}
self.constraints = constraints;
let new_objects: Vec<_> = queries::fetch_lockable_objects(tx, &oid_vec)?
.into_iter()
.filter(|target| !self.created_objects.contains(&target.oid))
.collect();
self.created_objects
.extend(new_objects.iter().map(|obj| obj.oid));
let statement = SqlStatementTrace {
sql: sql.1.to_string(),
locks_taken: new_locks.into_iter().collect(),
start_time,
duration,
added_columns,
modified_columns,
added_constraints,
modified_constraints,
created_objects: new_objects,
lock_timeout_millis: lock_timeout,
rewritten_objects: changed_ids,
line_no: sql.0,
fks_missing_index: if final_checks {
queries::fks_missing_index(tx)?
} else {
Vec::new()
},
};
let ctx = StatementCtx {
sql_statement_trace: &statement,
transaction: self,
};
let hint_action = find_comment_action(sql.1)?;
let hints: Vec<_> = filter_rules(
&hint_action,
hints::all_hints()
.iter()
.filter(|hint| !self.ignored_hints.contains(&hint.id())),
)
.filter_map(|hint| hint.check(&ctx))
.collect();
self.triggered_hints.push(hints);
self.statements.push(statement);
self.all_locks.extend(locks_taken.iter().cloned());
Ok(())
}
pub fn new(
name: Option<String>,
trace_targets: HashSet<Oid>,
columns: HashMap<ColumnIdentifier, ColumnMetadata>,
constraints: HashMap<Oid, Constraint>,
relfile_ids: HashMap<Oid, u32>,
ignored_hints: &'a [&'a str],
) -> Self {
Self {
name,
initial_objects: trace_targets,
statements: vec![],
all_locks: HashSet::default(),
trace_start: Utc::now(),
columns,
constraints,
concurrent: false,
created_objects: Default::default(),
triggered_hints: vec![],
relfile_ids,
ignored_hints,
}
}
pub fn tracer_for_concurrently<S: AsRef<str>>(
name: Option<String>,
statements: impl Iterator<Item = (usize, S)>,
ignored_hints: &'a [&'a str],
) -> Self {
let mut out = Self {
name,
initial_objects: HashSet::default(),
statements: statements
.map(|(line, s)| SqlStatementTrace {
sql: s.as_ref().to_string(),
locks_taken: vec![],
start_time: Instant::now(),
duration: Duration::from_secs(0),
added_columns: vec![],
modified_columns: vec![],
added_constraints: vec![],
modified_constraints: vec![],
created_objects: vec![],
lock_timeout_millis: 0,
rewritten_objects: vec![],
line_no: line,
fks_missing_index: Vec::new(),
})
.collect(),
all_locks: HashSet::default(),
trace_start: Utc::now(),
columns: HashMap::default(),
constraints: HashMap::default(),
concurrent: true,
created_objects: Default::default(),
triggered_hints: vec![],
relfile_ids: Default::default(),
ignored_hints,
};
out.triggered_hints = vec![vec![]; out.statements.len()];
out
}
}