use super::aggregates::FileAggregateSet;
use super::metadata::{ContentHash, FileMetadata, ObservedFile};
use super::{bool_to_i64, i64_to_u64, raw_usage_from_options, u64_to_i64, usage_from_i64};
use crate::app::model::UsageTotals;
use crate::app::report::SessionScanTarget;
use crate::app::session_files::SessionFileFormat;
use crate::app::session_log::SessionParseCheckpoint;
use eyre::Result;
use rusqlite::{Connection, Transaction, params, params_from_iter};
use std::collections::HashMap;
const PARSER_VERSION: i64 = 1;
#[derive(Clone)]
pub(super) struct ScannedFile {
pub(super) target: SessionScanTarget,
pub(super) aggregates: FileAggregateSet,
pub(super) cache_entry: Option<ScannedCacheEntry>,
}
impl ScannedFile {
pub(super) fn with_aggregates(self, aggregates: FileAggregateSet) -> Self {
Self { aggregates, ..self }
}
}
#[derive(Clone)]
pub(super) struct ScannedCacheEntry {
pub(super) session_key: String,
path: String,
metadata: FileMetadata,
checkpoint: SessionParseCheckpoint,
content_hash: ContentHash,
}
impl ScannedCacheEntry {
pub(super) fn from_scan(
target: &SessionScanTarget,
checkpoint: &SessionParseCheckpoint,
content_hash: ContentHash,
) -> Self {
let observed = ObservedFile::from_target(target);
let mut metadata = observed.metadata;
metadata.size = checkpoint.offset;
Self {
session_key: target.session_id.clone(),
path: observed.path_key,
metadata,
checkpoint: checkpoint.clone(),
content_hash,
}
}
}
#[derive(Clone)]
pub(super) struct StoredFileRecord {
pub(super) session_key: String,
pub(super) path: String,
pub(super) generation: i64,
parser_version: i64,
pub(super) metadata: FileMetadata,
pub(super) checkpoint: SessionParseCheckpoint,
pub(super) content_hash: ContentHash,
pub(super) total: UsageTotals,
pub(super) fallback_total: UsageTotals,
}
impl StoredFileRecord {
pub(super) fn is_compatible_with(&self, observed: &ObservedFile) -> bool {
self.parser_version == PARSER_VERSION && self.path == observed.path_key
}
pub(super) fn can_append_to(&self, observed: &ObservedFile) -> bool {
self.metadata.file_format == SessionFileFormat::Plain
&& observed.file_format == SessionFileFormat::Plain
&& observed.metadata.size > self.metadata.size
&& self.checkpoint.offset <= self.metadata.size
&& self.metadata.same_identity_as(&observed.metadata)
}
}
pub(super) struct RawStoredFileRecord {
session_key: String,
path: String,
generation: i64,
parser_version: i64,
file_format: String,
size: i64,
mtime_ns: Option<i64>,
dev: Option<i64>,
ino: Option<i64>,
ctime_ns: Option<i64>,
checkpoint_offset: i64,
previous_input: Option<i64>,
previous_cached_input: Option<i64>,
previous_output: Option<i64>,
previous_reasoning_output: Option<i64>,
previous_total: Option<i64>,
current_model: Option<String>,
current_model_is_fallback: i64,
content_hash: String,
total_input: i64,
total_cached_input: i64,
total_output: i64,
total_reasoning_output: i64,
total_tokens: i64,
fallback_input: i64,
fallback_cached_input: i64,
fallback_output: i64,
fallback_reasoning_output: i64,
fallback_total: i64,
}
impl RawStoredFileRecord {
pub(super) fn from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<Self> {
Ok(Self {
session_key: row.get(0)?,
path: row.get(1)?,
generation: row.get(2)?,
parser_version: row.get(3)?,
file_format: row.get(4)?,
size: row.get(5)?,
mtime_ns: row.get(6)?,
dev: row.get(7)?,
ino: row.get(8)?,
ctime_ns: row.get(9)?,
checkpoint_offset: row.get(10)?,
previous_input: row.get(11)?,
previous_cached_input: row.get(12)?,
previous_output: row.get(13)?,
previous_reasoning_output: row.get(14)?,
previous_total: row.get(15)?,
current_model: row.get(16)?,
current_model_is_fallback: row.get(17)?,
content_hash: row.get(18)?,
total_input: row.get(19)?,
total_cached_input: row.get(20)?,
total_output: row.get(21)?,
total_reasoning_output: row.get(22)?,
total_tokens: row.get(23)?,
fallback_input: row.get(24)?,
fallback_cached_input: row.get(25)?,
fallback_output: row.get(26)?,
fallback_reasoning_output: row.get(27)?,
fallback_total: row.get(28)?,
})
}
pub(super) fn into_valid_record(self) -> Option<StoredFileRecord> {
let checkpoint_offset = i64_to_u64(self.checkpoint_offset)?;
let metadata = FileMetadata {
file_format: SessionFileFormat::from_str(&self.file_format)?,
size: i64_to_u64(self.size)?,
mtime_ns: self.mtime_ns,
dev: self.dev,
ino: self.ino,
ctime_ns: self.ctime_ns,
};
if checkpoint_offset != metadata.size {
return None;
}
let previous_totals = raw_usage_from_options(
self.previous_input,
self.previous_cached_input,
self.previous_output,
self.previous_reasoning_output,
self.previous_total,
)
.ok()?;
Some(StoredFileRecord {
session_key: self.session_key,
path: self.path,
generation: (self.generation > 0).then_some(self.generation)?,
parser_version: self.parser_version,
metadata,
checkpoint: SessionParseCheckpoint {
offset: checkpoint_offset,
previous_totals,
current_model: self.current_model,
current_model_is_fallback: self.current_model_is_fallback != 0,
},
content_hash: ContentHash::decode(&self.content_hash)?,
total: usage_from_i64(
self.total_input,
self.total_cached_input,
self.total_output,
self.total_reasoning_output,
self.total_tokens,
)?,
fallback_total: usage_from_i64(
self.fallback_input,
self.fallback_cached_input,
self.fallback_output,
self.fallback_reasoning_output,
self.fallback_total,
)?,
})
}
}
const QUERY_KEY_CHUNK_SIZE: usize = 900;
const FILE_RECORD_COLUMNS: &str = "session_key, path, generation, parser_version, file_format, \
size, mtime_ns, dev, ino, ctime_ns, checkpoint_offset, previous_input, previous_cached_input, \
previous_output, previous_reasoning_output, previous_total, current_model, \
current_model_is_fallback, content_hash, total_input, total_cached_input, \
total_output, total_reasoning_output, total_tokens, fallback_input, \
fallback_cached_input, fallback_output, fallback_reasoning_output, fallback_total";
pub(super) fn load_file_records(
connection: &Connection,
) -> Result<HashMap<String, StoredFileRecord>> {
let query = format!("SELECT {FILE_RECORD_COLUMNS} FROM files");
let mut statement = connection.prepare_cached(&query)?;
let rows = statement.query_map([], RawStoredFileRecord::from_row)?;
let mut records = HashMap::new();
for row in rows {
if let Some(record) = row?.into_valid_record() {
records.insert(record.session_key.clone(), record);
}
}
Ok(records)
}
pub(super) fn load_file_records_for_keys(
connection: &Connection,
session_keys: &[&str],
) -> Result<HashMap<String, StoredFileRecord>> {
let mut records = HashMap::new();
if session_keys.is_empty() {
return Ok(records);
}
for chunk in session_keys.chunks(QUERY_KEY_CHUNK_SIZE) {
let placeholders = std::iter::repeat_n("?", chunk.len())
.collect::<Vec<_>>()
.join(", ");
let query = format!(
"SELECT {FILE_RECORD_COLUMNS} FROM files WHERE session_key IN ({placeholders})"
);
let mut statement = connection.prepare(&query)?;
let rows = statement.query_map(
params_from_iter(chunk.iter().copied()),
RawStoredFileRecord::from_row,
)?;
for row in rows {
if let Some(record) = row?.into_valid_record() {
records.insert(record.session_key.clone(), record);
}
}
}
Ok(records)
}
pub(super) fn update_file_record_conditionally(
transaction: &Transaction<'_>,
record: &StoredFileRecord,
generation: i64,
cache_entry: &ScannedCacheEntry,
aggregates: &FileAggregateSet,
) -> Result<usize> {
let total = aggregates.total_usage();
let fallback_total = aggregates.fallback_usage();
Ok(transaction.execute(
"UPDATE files SET generation = ?1, parser_version = ?2, file_format = ?3, size = ?4, \
mtime_ns = ?5, dev = ?6, ino = ?7, ctime_ns = ?8, checkpoint_offset = ?9, \
previous_input = ?10, previous_cached_input = ?11, previous_output = ?12, \
previous_reasoning_output = ?13, previous_total = ?14, current_model = ?15, \
current_model_is_fallback = ?16, content_hash = ?17, total_input = ?18, \
total_cached_input = ?19, total_output = ?20, total_reasoning_output = ?21, \
total_tokens = ?22, fallback_input = ?23, fallback_cached_input = ?24, \
fallback_output = ?25, fallback_reasoning_output = ?26, fallback_total = ?27 \
WHERE session_key = ?28 AND path = ?29 AND generation = ?30 AND size = ?31 \
AND checkpoint_offset = ?32",
params![
generation,
PARSER_VERSION,
cache_entry.metadata.file_format.as_str(),
u64_to_i64(cache_entry.metadata.size)?,
cache_entry.metadata.mtime_ns,
cache_entry.metadata.dev,
cache_entry.metadata.ino,
cache_entry.metadata.ctime_ns,
u64_to_i64(cache_entry.checkpoint.offset)?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.input))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.cached_input))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.output))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.reasoning_output))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.total))
.transpose()?,
cache_entry.checkpoint.current_model.as_deref(),
bool_to_i64(cache_entry.checkpoint.current_model_is_fallback),
cache_entry.content_hash.encode(),
u64_to_i64(total.input)?,
u64_to_i64(total.cached_input)?,
u64_to_i64(total.output)?,
u64_to_i64(total.reasoning_output)?,
u64_to_i64(total.total)?,
u64_to_i64(fallback_total.input)?,
u64_to_i64(fallback_total.cached_input)?,
u64_to_i64(fallback_total.output)?,
u64_to_i64(fallback_total.reasoning_output)?,
u64_to_i64(fallback_total.total)?,
record.session_key.as_str(),
record.path.as_str(),
record.generation,
u64_to_i64(record.metadata.size)?,
u64_to_i64(record.checkpoint.offset)?,
],
)?)
}
pub(super) fn upsert_file_record(
transaction: &Transaction<'_>,
generation: i64,
cache_entry: &ScannedCacheEntry,
aggregates: &FileAggregateSet,
) -> Result<()> {
let total = aggregates.total_usage();
let fallback_total = aggregates.fallback_usage();
transaction.execute(
"INSERT INTO files (
session_key, path, generation, parser_version, file_format, size, mtime_ns, dev,
ino, ctime_ns, checkpoint_offset, previous_input, previous_cached_input,
previous_output, previous_reasoning_output, previous_total, current_model,
current_model_is_fallback, content_hash, total_input, total_cached_input,
total_output, total_reasoning_output, total_tokens, fallback_input,
fallback_cached_input, fallback_output,
fallback_reasoning_output, fallback_total
) VALUES (
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17,
?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29
)
ON CONFLICT(session_key) DO UPDATE SET
path = excluded.path,
generation = excluded.generation,
parser_version = excluded.parser_version,
file_format = excluded.file_format,
size = excluded.size,
mtime_ns = excluded.mtime_ns,
dev = excluded.dev,
ino = excluded.ino,
ctime_ns = excluded.ctime_ns,
checkpoint_offset = excluded.checkpoint_offset,
previous_input = excluded.previous_input,
previous_cached_input = excluded.previous_cached_input,
previous_output = excluded.previous_output,
previous_reasoning_output = excluded.previous_reasoning_output,
previous_total = excluded.previous_total,
current_model = excluded.current_model,
current_model_is_fallback = excluded.current_model_is_fallback,
content_hash = excluded.content_hash,
total_input = excluded.total_input,
total_cached_input = excluded.total_cached_input,
total_output = excluded.total_output,
total_reasoning_output = excluded.total_reasoning_output,
total_tokens = excluded.total_tokens,
fallback_input = excluded.fallback_input,
fallback_cached_input = excluded.fallback_cached_input,
fallback_output = excluded.fallback_output,
fallback_reasoning_output = excluded.fallback_reasoning_output,
fallback_total = excluded.fallback_total",
params![
cache_entry.session_key.as_str(),
cache_entry.path.as_str(),
generation,
PARSER_VERSION,
cache_entry.metadata.file_format.as_str(),
u64_to_i64(cache_entry.metadata.size)?,
cache_entry.metadata.mtime_ns,
cache_entry.metadata.dev,
cache_entry.metadata.ino,
cache_entry.metadata.ctime_ns,
u64_to_i64(cache_entry.checkpoint.offset)?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.input))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.cached_input))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.output))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.reasoning_output))
.transpose()?,
cache_entry
.checkpoint
.previous_totals
.map(|usage| u64_to_i64(usage.total))
.transpose()?,
cache_entry.checkpoint.current_model.as_deref(),
bool_to_i64(cache_entry.checkpoint.current_model_is_fallback),
cache_entry.content_hash.encode(),
u64_to_i64(total.input)?,
u64_to_i64(total.cached_input)?,
u64_to_i64(total.output)?,
u64_to_i64(total.reasoning_output)?,
u64_to_i64(total.total)?,
u64_to_i64(fallback_total.input)?,
u64_to_i64(fallback_total.cached_input)?,
u64_to_i64(fallback_total.output)?,
u64_to_i64(fallback_total.reasoning_output)?,
u64_to_i64(fallback_total.total)?,
],
)?;
Ok(())
}