use crate::error::StorageError;
use rusqlite::{Connection, Transaction};
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub struct PurgeRecord {
pub start_time: i64,
pub end_time: i64,
pub logs_deleted: i64,
pub spans_deleted: i64,
pub metrics_deleted: i64,
}
pub struct PurgeLock {
locked: Arc<Mutex<bool>>,
}
impl Default for PurgeLock {
fn default() -> Self {
Self {
locked: Arc::new(Mutex::new(false)),
}
}
}
impl PurgeLock {
pub fn new() -> Self {
Self::default()
}
pub async fn try_lock(&self) -> Result<PurgeGuard, StorageError> {
let mut locked = self.locked.lock().await;
if *locked {
return Err(StorageError::WriteError(
"Purge operation already in progress".to_string(),
));
}
*locked = true;
Ok(PurgeGuard {
locked: self.locked.clone(),
})
}
}
pub struct PurgeGuard {
locked: Arc<Mutex<bool>>,
}
impl Drop for PurgeGuard {
fn drop(&mut self) {
let locked = self.locked.clone();
tokio::spawn(async move {
let mut lock = locked.lock().await;
*lock = false;
});
}
}
pub fn purge_old_data(
conn: &mut Connection,
cutoff_timestamp: i64,
batch_size: usize,
signal_types: &[crate::SignalType],
dry_run: bool,
) -> Result<PurgeRecord, StorageError> {
let start_time = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let mut logs_deleted = 0i64;
let mut spans_deleted = 0i64;
let mut metrics_deleted = 0i64;
if signal_types.contains(&crate::SignalType::Logs) {
loop {
let deleted = if dry_run {
count_batch(conn, "logs", cutoff_timestamp, batch_size)?
} else {
delete_batch(conn, "logs", cutoff_timestamp, batch_size)?
};
logs_deleted += deleted;
if deleted < batch_size as i64 {
break;
}
}
}
if signal_types.contains(&crate::SignalType::Traces) {
loop {
let deleted = if dry_run {
count_batch(conn, "spans", cutoff_timestamp, batch_size)?
} else {
delete_batch(conn, "spans", cutoff_timestamp, batch_size)?
};
spans_deleted += deleted;
if deleted < batch_size as i64 {
break;
}
}
}
if signal_types.contains(&crate::SignalType::Metrics) {
loop {
let deleted = if dry_run {
count_batch(conn, "metrics", cutoff_timestamp, batch_size)?
} else {
delete_batch(conn, "metrics", cutoff_timestamp, batch_size)?
};
metrics_deleted += deleted;
if deleted < batch_size as i64 {
break;
}
}
}
let end_time = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let record = PurgeRecord {
start_time,
end_time,
logs_deleted,
spans_deleted,
metrics_deleted,
};
if !dry_run {
record_purge_history(conn, &record)?;
}
Ok(record)
}
fn count_batch(
conn: &Connection,
table: &str,
cutoff_timestamp: i64,
batch_size: usize,
) -> Result<i64, StorageError> {
let timestamp_col = match table {
"spans" => "start_time",
_ => "timestamp", };
let sql = format!(
"SELECT COUNT(*) FROM (
SELECT id FROM {} WHERE {} < ? LIMIT ?
)",
table, timestamp_col
);
conn.query_row(
&sql,
rusqlite::params![cutoff_timestamp, batch_size],
|row| row.get::<_, i64>(0),
)
.map_err(|e| StorageError::QueryError(format!("Failed to count batch: {}", e)))
}
fn delete_batch(
conn: &mut Connection,
table: &str,
cutoff_timestamp: i64,
batch_size: usize,
) -> Result<i64, StorageError> {
let tx = conn
.transaction()
.map_err(|e| StorageError::WriteError(format!("Failed to start transaction: {}", e)))?;
let deleted = delete_batch_in_transaction(&tx, table, cutoff_timestamp, batch_size)?;
tx.commit()
.map_err(|e| StorageError::WriteError(format!("Failed to commit transaction: {}", e)))?;
Ok(deleted)
}
fn delete_batch_in_transaction(
tx: &Transaction,
table: &str,
cutoff_timestamp: i64,
batch_size: usize,
) -> Result<i64, StorageError> {
let timestamp_col = match table {
"spans" => "start_time",
_ => "timestamp", };
let sql = format!(
"DELETE FROM {} WHERE id IN (
SELECT id FROM {} WHERE {} < ? LIMIT ?
)",
table, table, timestamp_col
);
tx.execute(&sql, rusqlite::params![cutoff_timestamp, batch_size])
.map(|n| n as i64)
.map_err(|e| StorageError::WriteError(format!("Failed to delete batch: {}", e)))
}
fn record_purge_history(conn: &Connection, record: &PurgeRecord) -> Result<(), StorageError> {
conn.execute(
"INSERT INTO purge_history (start_time, end_time, logs_deleted, spans_deleted, metrics_deleted)
VALUES (?, ?, ?, ?, ?)",
rusqlite::params![
record.start_time,
record.end_time,
record.logs_deleted,
record.spans_deleted,
record.metrics_deleted,
],
)
.map_err(|e| StorageError::WriteError(format!("Failed to record purge history: {}", e)))?;
Ok(())
}
pub fn vacuum(conn: &mut Connection) -> Result<(), StorageError> {
conn.execute_batch("VACUUM")
.map_err(|e| StorageError::WriteError(format!("Failed to vacuum database: {}", e)))
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
#[test]
fn test_purge_lock() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let lock = PurgeLock::new();
let guard1 = lock.try_lock().await;
assert!(guard1.is_ok());
let guard2 = lock.try_lock().await;
assert!(guard2.is_err());
drop(guard1);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let guard3 = lock.try_lock().await;
assert!(guard3.is_ok());
});
}
#[test]
fn test_delete_batch() {
let mut conn = Connection::open_in_memory().unwrap();
conn.execute(
"CREATE TABLE logs (
id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
data TEXT
)",
[],
)
.unwrap();
for i in 0..15 {
conn.execute(
"INSERT INTO logs (timestamp, data) VALUES (?, ?)",
rusqlite::params![i * 1000, format!("log {}", i)],
)
.unwrap();
}
let deleted = delete_batch(&mut conn, "logs", 10000, 10).unwrap();
assert_eq!(deleted, 10);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM logs", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 5);
}
#[test]
fn test_vacuum() {
let mut conn = Connection::open_in_memory().unwrap();
let result = vacuum(&mut conn);
assert!(result.is_ok());
}
}