mod schema;
use crate::config::{HistoryConfig, HistoryRedactionMode};
use crate::logging::{RedactionConfig, RedactionMode};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use tracing::{debug, error, trace, warn};
pub use schema::{
AgentStat, BackupResult, CURRENT_SCHEMA_VERSION, CheckResult, CommandEntry,
DEFAULT_DB_FILENAME, ExportFilters, ExportOptions, ExportedData, FrequentBlock,
HistoryAnalyzer, HistoryDb, HistoryError, HistoryStats, InteractiveAllowlistAuditEntry,
InteractiveAllowlistOptionType, Outcome, OutcomeStats, PackEffectivenessAnalysis,
PackRecommendation, PathCluster, PatternEffectiveness, PatternStat, PerformanceStats,
PotentialGap, ProjectStat, RecommendationType, RuleMetrics, RuleTrend, StatsTrends,
SuggestionAction, SuggestionAuditEntry, SuggestionCandidate,
};
pub const ENV_HISTORY_DB_PATH: &str = "DCG_HISTORY_DB";
pub const ENV_HISTORY_DISABLED: &str = "DCG_HISTORY_DISABLED";
enum HistoryMessage {
Entry(Box<CommandEntry>),
Flush(mpsc::Sender<()>),
Shutdown,
}
#[derive(Clone)]
struct WorkerConfig {
batch_size: usize,
flush_interval: Duration,
auto_prune: bool,
retention_days: u32,
prune_check_interval: Duration,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
batch_size: 50,
flush_interval: Duration::from_millis(100),
auto_prune: false,
retention_days: 90,
prune_check_interval: Duration::from_secs(24 * 3600),
}
}
}
impl From<&HistoryConfig> for WorkerConfig {
fn from(config: &HistoryConfig) -> Self {
Self {
batch_size: config.batch_size as usize,
flush_interval: Duration::from_millis(u64::from(config.batch_flush_interval_ms)),
auto_prune: config.auto_prune,
retention_days: config.retention_days,
prune_check_interval: Duration::from_secs(
u64::from(config.prune_check_interval_hours) * 3600,
),
}
}
}
#[derive(Clone)]
pub struct HistoryFlushHandle {
sender: mpsc::Sender<HistoryMessage>,
}
impl HistoryFlushHandle {
pub fn flush_sync(&self) {
const FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
let (ack_tx, ack_rx) = mpsc::channel();
if self.sender.send(HistoryMessage::Flush(ack_tx)).is_ok() {
let _ = ack_rx.recv_timeout(FLUSH_TIMEOUT);
}
}
}
pub struct HistoryWriter {
sender: Option<mpsc::Sender<HistoryMessage>>,
handle: Option<thread::JoinHandle<()>>,
redaction_mode: HistoryRedactionMode,
session_id: String,
}
impl HistoryWriter {
#[must_use]
pub fn new(db_path: Option<std::path::PathBuf>, config: &HistoryConfig) -> Self {
if !config.enabled {
return Self::disabled();
}
let session_id = generate_session_id();
let (sender, receiver) = mpsc::channel::<HistoryMessage>();
let worker_config = WorkerConfig::from(config);
let handle = match thread::Builder::new()
.name("dcg-history-writer".to_string())
.spawn(move || {
match HistoryDb::open(db_path) {
Ok(db) => history_worker(db, receiver, worker_config),
Err(e) => {
error!(error = %e, "Failed to open history DB in worker thread");
drop(receiver);
}
}
}) {
Ok(h) => h,
Err(e) => {
error!(
error = %e,
"Failed to spawn history writer thread - history collection disabled"
);
return Self::disabled();
}
};
Self {
sender: Some(sender),
handle: Some(handle),
redaction_mode: config.redaction_mode,
session_id,
}
}
#[must_use]
pub const fn disabled() -> Self {
Self {
sender: None,
handle: None,
redaction_mode: HistoryRedactionMode::Pattern,
session_id: String::new(),
}
}
#[must_use]
pub fn session_id(&self) -> &str {
&self.session_id
}
#[must_use]
pub fn flush_handle(&self) -> Option<HistoryFlushHandle> {
self.sender.as_ref().map(|sender| HistoryFlushHandle {
sender: sender.clone(),
})
}
pub fn log(&self, mut entry: CommandEntry) {
entry.command = redact_for_history(&entry.command, self.redaction_mode);
if entry.session_id.is_none() && !self.session_id.is_empty() {
entry.session_id = Some(self.session_id.clone());
}
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(HistoryMessage::Entry(Box::new(entry))) {
warn!(
error = %e,
"Failed to send history entry - worker thread unavailable"
);
}
}
}
pub fn flush(&self) {
if let Some(sender) = &self.sender {
let (ack_tx, _ack_rx) = mpsc::channel();
let _ = sender.send(HistoryMessage::Flush(ack_tx));
}
}
pub fn flush_sync(&self) {
if let Some(handle) = self.flush_handle() {
handle.flush_sync();
}
}
}
impl Drop for HistoryWriter {
fn drop(&mut self) {
self.flush_sync();
if let Some(sender) = self.sender.take() {
let _ = sender.send(HistoryMessage::Shutdown);
}
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
fn generate_session_id() -> String {
use sha2::{Digest, Sha256};
use std::process;
let now = chrono::Utc::now();
let pid = process::id();
let thread_id = format!("{:?}", thread::current().id());
let mut hasher = Sha256::new();
hasher.update(now.timestamp_nanos_opt().unwrap_or(0).to_le_bytes());
hasher.update(pid.to_le_bytes());
hasher.update(thread_id.as_bytes());
let digest = hasher.finalize();
format!(
"ses-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
digest[0], digest[1], digest[2], digest[3], digest[4], digest[5], digest[6], digest[7]
)
}
#[allow(clippy::needless_pass_by_value)]
fn history_worker(
mut db: HistoryDb,
receiver: mpsc::Receiver<HistoryMessage>,
config: WorkerConfig,
) {
let mut batch: Vec<CommandEntry> = Vec::with_capacity(config.batch_size);
let mut last_flush = Instant::now();
let mut last_prune_check = Instant::now();
let db_path = db.path().map(std::path::Path::to_path_buf);
let mut history_disabled = false;
if config.auto_prune {
check_and_prune(&db, config.retention_days);
}
loop {
let timeout = config.flush_interval.saturating_sub(last_flush.elapsed());
match receiver.recv_timeout(timeout) {
Ok(HistoryMessage::Entry(entry)) => {
if history_disabled {
continue;
}
batch.push(*entry);
if batch.len() >= config.batch_size {
flush_batch_with_recovery(
&mut db,
&mut batch,
db_path.as_ref(),
&mut history_disabled,
);
last_flush = Instant::now();
}
}
Ok(HistoryMessage::Flush(ack)) => {
let mut pending_acks = vec![ack];
let mut should_shutdown = false;
loop {
while let Some(msg) =
drain_entries_into_batch(&receiver, &mut batch, config.batch_size)
{
match msg {
HistoryMessage::Flush(pending_ack) => pending_acks.push(pending_ack),
HistoryMessage::Shutdown => {
should_shutdown = true;
break;
}
HistoryMessage::Entry(_) => unreachable!(),
}
}
if batch.is_empty() || should_shutdown {
break;
}
flush_batch_with_recovery(
&mut db,
&mut batch,
db_path.as_ref(),
&mut history_disabled,
);
}
flush_batch_with_recovery(
&mut db,
&mut batch,
db_path.as_ref(),
&mut history_disabled,
);
last_flush = Instant::now();
for pending_ack in pending_acks {
let _ = pending_ack.send(());
}
if should_shutdown {
if let Err(e) = db.checkpoint() {
warn!(error = %e, "WAL checkpoint failed on shutdown");
}
break;
}
}
Ok(HistoryMessage::Shutdown) => {
let mut pending_acks = Vec::new();
while let Some(msg) =
drain_entries_into_batch(&receiver, &mut batch, config.batch_size)
{
match msg {
HistoryMessage::Flush(pending_ack) => pending_acks.push(pending_ack),
HistoryMessage::Shutdown => {} HistoryMessage::Entry(_) => unreachable!(),
}
}
flush_batch_with_recovery(
&mut db,
&mut batch,
db_path.as_ref(),
&mut history_disabled,
);
for pending_ack in pending_acks {
let _ = pending_ack.send(());
}
if let Err(e) = db.checkpoint() {
warn!(error = %e, "WAL checkpoint failed during shutdown");
} else {
debug!("WAL checkpoint completed successfully");
}
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
if !batch.is_empty() {
flush_batch_with_recovery(
&mut db,
&mut batch,
db_path.as_ref(),
&mut history_disabled,
);
last_flush = Instant::now();
}
if !history_disabled
&& config.auto_prune
&& last_prune_check.elapsed() >= config.prune_check_interval
{
check_and_prune(&db, config.retention_days);
last_prune_check = Instant::now();
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
debug!("History channel disconnected, performing final flush");
flush_batch_with_recovery(
&mut db,
&mut batch,
db_path.as_ref(),
&mut history_disabled,
);
if let Err(e) = db.checkpoint() {
warn!(error = %e, "WAL checkpoint failed on channel disconnect");
}
break;
}
}
}
}
fn recover_history_db(db: &mut HistoryDb, db_path: Option<&std::path::PathBuf>) -> bool {
let Some(path) = db_path else {
return false;
};
match HistoryDb::open(Some(path.clone())) {
Ok(recovered_db) => {
*db = recovered_db;
true
}
Err(e) => {
error!(
error = %e,
path = %path.display(),
"Failed to recover history DB after fatal storage error"
);
false
}
}
}
fn flush_batch_with_recovery(
db: &mut HistoryDb,
batch: &mut Vec<CommandEntry>,
db_path: Option<&std::path::PathBuf>,
history_disabled: &mut bool,
) {
if *history_disabled {
batch.clear();
return;
}
match flush_batch(db, batch) {
FlushOutcome::Success => {}
FlushOutcome::Fatal => {
warn!("Detected fatal history storage error; attempting DB recovery");
if recover_history_db(db, db_path) {
match flush_batch(db, batch) {
FlushOutcome::Success => {
debug!("History DB recovery succeeded");
}
FlushOutcome::Fatal => {
*history_disabled = true;
batch.clear();
error!(
"History DB remained unusable after recovery; disabling history writes for this process"
);
}
}
} else {
*history_disabled = true;
batch.clear();
error!(
"History DB recovery unavailable; disabling history writes for this process"
);
}
}
}
}
fn drain_entries_into_batch(
receiver: &mpsc::Receiver<HistoryMessage>,
batch: &mut Vec<CommandEntry>,
batch_size: usize,
) -> Option<HistoryMessage> {
loop {
match receiver.try_recv() {
Ok(HistoryMessage::Entry(entry)) => {
batch.push(*entry);
if batch.len() >= batch_size * 2 {
return None;
}
}
Ok(msg @ (HistoryMessage::Flush(_) | HistoryMessage::Shutdown)) => {
return Some(msg);
}
Err(mpsc::TryRecvError::Empty | mpsc::TryRecvError::Disconnected) => {
return None;
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FlushOutcome {
Success,
Fatal,
}
fn is_fatal_storage_error(error: &HistoryError) -> bool {
let lower = error.to_string().to_ascii_lowercase();
lower.contains("io_uring") && (lower.contains("panicked") || lower.contains("lock poisoned"))
}
fn flush_batch(db: &HistoryDb, batch: &mut Vec<CommandEntry>) -> FlushOutcome {
if batch.is_empty() {
return FlushOutcome::Success;
}
let batch_len = batch.len();
trace!(batch_size = batch_len, "Flushing history batch");
if batch_len >= 2 {
match db.log_commands_batch(batch) {
Ok(()) => {
trace!(count = batch_len, "Batch insert succeeded");
}
Err(e) => {
if is_fatal_storage_error(&e) {
error!(
error = %e,
batch_size = batch_len,
"Fatal history storage error in batch insert"
);
return FlushOutcome::Fatal;
}
warn!(
error = %e,
batch_size = batch_len,
"Batch insert failed, falling back to individual inserts"
);
let mut success_count = 0;
let mut error_count = 0;
for entry in batch.iter() {
match db.log_command(entry) {
Ok(_id) => success_count += 1,
Err(insert_err) => {
if is_fatal_storage_error(&insert_err) {
error!(
error = %insert_err,
command = %entry.command,
"Fatal history storage error in single-row insert"
);
return FlushOutcome::Fatal;
}
error_count += 1;
if error_count <= 3 {
error!(
error = %insert_err,
command = %entry.command,
"Failed to insert history entry"
);
}
}
}
}
if error_count > 3 {
error!(
total_errors = error_count,
"Additional history insert errors suppressed"
);
}
if error_count > 0 {
warn!(
success = success_count,
failed = error_count,
"History batch recovery completed with errors"
);
}
}
}
} else {
for entry in batch.iter() {
match db.log_command(entry) {
Ok(_id) => trace!(command = %entry.command, "Inserted history entry"),
Err(e) => {
if is_fatal_storage_error(&e) {
error!(
error = %e,
command = %entry.command,
"Fatal history storage error in single-row insert"
);
return FlushOutcome::Fatal;
}
error!(
error = %e,
command = %entry.command,
"Failed to insert history entry"
);
}
}
}
}
batch.clear();
FlushOutcome::Success
}
fn check_and_prune(db: &HistoryDb, retention_days: u32) {
match db.should_auto_prune() {
Ok(true) => {
debug!(retention_days = retention_days, "Starting auto-prune");
match db.prune_older_than_days(u64::from(retention_days), false) {
Ok(pruned_count) => {
debug!(pruned = pruned_count, "Auto-prune completed");
if let Err(e) = db.record_prune_timestamp() {
warn!(error = %e, "Failed to record prune timestamp");
}
}
Err(e) => {
error!(error = %e, "Auto-prune failed");
}
}
}
Ok(false) => {
trace!("Auto-prune not needed yet");
}
Err(e) => {
warn!(error = %e, "Failed to check if auto-prune is needed");
}
}
}
fn redact_for_history(command: &str, mode: HistoryRedactionMode) -> String {
match mode {
HistoryRedactionMode::None => command.to_string(),
HistoryRedactionMode::Full => "[REDACTED]".to_string(),
HistoryRedactionMode::Pattern => {
let config = RedactionConfig {
enabled: true,
mode: RedactionMode::Arguments,
..Default::default()
};
crate::logging::redact_command(command, &config)
}
}
}