#![allow(clippy::disallowed_methods, clippy::disallowed_types)]
use std::{fs, io::Write, sync::LazyLock};
use dashmap::DashMap;
use serde::Deserialize;
static REPORT_CURSORS: LazyLock<DashMap<String, u64>> = LazyLock::new(DashMap::new);
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProxyReport {
#[allow(dead_code)]
session_id: String,
#[allow(dead_code)]
agent_id: String,
#[allow(dead_code)]
project_path: Option<String>,
#[serde(default)]
user_name: Option<String>,
op_type: String,
#[allow(dead_code)]
method: Option<String>,
#[allow(dead_code)]
before_tokens: u64,
#[allow(dead_code)]
after_tokens: u64,
saved_tokens: u64,
#[allow(dead_code)]
#[serde(default)]
before_bytes: u64,
#[allow(dead_code)]
#[serde(default)]
after_bytes: u64,
#[allow(dead_code)]
#[serde(default)]
saved_bytes: u64,
#[allow(dead_code)]
#[serde(default)]
timestamp: String,
}
#[derive(Debug, Default)]
pub(crate) struct TokenlessAccumulator {
pub total_saved: u64,
pub rtk_saved: u64,
pub response_saved: u64,
pub schema_saved: u64,
pub breakdown_json: String,
pub project_path: Option<String>,
pub user_name: Option<String>,
}
pub(crate) fn consume_report(session_id: &str) -> Option<TokenlessAccumulator> {
if session_id.is_empty() {
return None;
}
let safe_sid: String = session_id
.chars()
.filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_')
.take(128)
.collect();
if safe_sid.is_empty() {
return None;
}
let home = dirs::home_dir()?;
let reports_dir = home
.join(".tokenfleet-ai")
.join("tokenless")
.join("reports");
let source = reports_dir.join(format!("{safe_sid}.jsonl"));
let content = fs::read_to_string(&source).ok()?;
if content.is_empty() {
return None;
}
let mut cursor = REPORT_CURSORS.entry(safe_sid).or_insert(0u64);
let start_byte = usize::try_from(*cursor).unwrap_or(usize::MAX);
let start_byte = if start_byte > content.len() {
*cursor = 0;
0
} else {
start_byte
};
let mut acc = TokenlessAccumulator::default();
let mut breakdown_items: Vec<serde_json::Value> = Vec::new();
let mut has_new_lines = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let line_start = line.as_ptr() as usize - content.as_ptr() as usize;
let is_new = line_start >= start_byte;
let Ok(report) = serde_json::from_str::<ProxyReport>(trimmed) else {
continue;
};
if acc.project_path.is_none() {
acc.project_path.clone_from(&report.project_path);
}
if acc.user_name.is_none() {
acc.user_name.clone_from(&report.user_name);
}
if is_new {
acc.total_saved += report.saved_tokens;
match report.op_type.as_str() {
"rewrite-command" => acc.rtk_saved += report.saved_tokens,
"compress-response" => acc.response_saved += report.saved_tokens,
"compress-schema" => acc.schema_saved += report.saved_tokens,
_ => {}
}
has_new_lines = true;
breakdown_items.push(serde_json::json!({
"op": report.op_type,
"method": report.method,
"beforeTokens": report.before_tokens,
"afterTokens": report.after_tokens,
"savedTokens": report.saved_tokens,
"beforeBytes": report.before_bytes,
"afterBytes": report.after_bytes,
"savedBytes": report.saved_bytes,
}));
}
}
if has_new_lines {
*cursor = content.len() as u64;
}
let has_new_data = has_new_lines;
let has_meta = acc.project_path.is_some() || acc.user_name.is_some();
if !has_new_data && !has_meta {
return None;
}
tracing::info!(
new_lines = breakdown_items.len(),
total_new_saved = acc.total_saved,
project_path = ?acc.project_path,
"consumed tokenless report (incremental)"
);
let _ = write_consume_log(
breakdown_items.len(),
acc.total_saved,
acc.project_path.as_deref(),
);
if !breakdown_items.is_empty() {
acc.breakdown_json = serde_json::to_string(&breakdown_items).unwrap_or_default();
}
Some(acc)
}
fn write_consume_log(lines: usize, total_saved: u64, project_path: Option<&str>) -> Result<(), ()> {
let home = dirs::home_dir().ok_or(())?;
let log_dir = home.join(".tokenfleet-ai").join("agent-proxy");
#[allow(clippy::disallowed_methods)]
std::fs::create_dir_all(&log_dir).map_err(|_| ())?;
let log_path = log_dir.join("report-consume.log");
let entry = serde_json::json!({
"ts": chrono::Utc::now().to_rfc3339(),
"lines": lines,
"total_saved": total_saved,
"project_path": project_path,
});
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.map_err(|_| ())?;
writeln!(f, "{}", serde_json::to_string(&entry).unwrap_or_default()).map_err(|_| ())
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
fn parse(content: &str, cursor: &mut u64) -> Option<TokenlessAccumulator> {
let start_byte = usize::try_from(*cursor).unwrap_or(usize::MAX);
let start = if start_byte > content.len() {
0
} else {
start_byte
};
let mut acc = TokenlessAccumulator::default();
let mut items: Vec<serde_json::Value> = Vec::new();
let mut has_new_lines = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let line_start = line.as_ptr() as usize - content.as_ptr() as usize;
let is_new = line_start >= start;
let Ok(report) = serde_json::from_str::<super::ProxyReport>(trimmed) else {
continue;
};
if acc.project_path.is_none() {
acc.project_path.clone_from(&report.project_path);
}
if acc.user_name.is_none() {
acc.user_name.clone_from(&report.user_name);
}
if is_new {
acc.total_saved += report.saved_tokens;
match report.op_type.as_str() {
"rewrite-command" => acc.rtk_saved += report.saved_tokens,
"compress-response" => acc.response_saved += report.saved_tokens,
"compress-schema" => acc.schema_saved += report.saved_tokens,
_ => {}
}
has_new_lines = true;
items.push(serde_json::json!({
"op": report.op_type,
"savedTokens": report.saved_tokens,
}));
}
}
if has_new_lines {
*cursor = content.len() as u64;
}
if items.is_empty() && acc.project_path.is_none() && acc.user_name.is_none() {
return None;
}
if !items.is_empty() {
acc.breakdown_json = serde_json::to_string(&items).unwrap_or_default();
}
Some(acc)
}
#[test]
fn test_incremental_empty_returns_none() {
let mut cursor = 0u64;
assert!(parse("", &mut cursor).is_none());
}
#[test]
fn test_incremental_first_read() {
let jsonl = r#"{"sessionId":"s","agentId":"a","projectPath":"test-proj","userName":"byx","opType":"rewrite-command","method":"Rtk","beforeTokens":100,"afterTokens":50,"savedTokens":50,"beforeBytes":400,"afterBytes":200,"savedBytes":200}"#;
let mut cursor = 0u64;
let acc = parse(jsonl, &mut cursor).unwrap();
assert_eq!(acc.total_saved, 50);
assert_eq!(acc.rtk_saved, 50);
assert_eq!(acc.response_saved, 0);
assert_eq!(acc.schema_saved, 0);
assert_eq!(acc.project_path.as_deref(), Some("test-proj"));
assert_eq!(acc.user_name.as_deref(), Some("byx"));
assert!(cursor > 0);
}
#[test]
fn test_per_category_splitting() {
let lines = [
r#"{"sessionId":"s","agentId":"a","projectPath":"p","userName":"u","opType":"rewrite-command","method":"Rtk","beforeTokens":100,"afterTokens":50,"savedTokens":30,"beforeBytes":100,"afterBytes":50,"savedBytes":50}"#,
r#"{"sessionId":"s","agentId":"a","opType":"rewrite-command","method":"Rtk","beforeTokens":50,"afterTokens":20,"savedTokens":25,"beforeBytes":200,"afterBytes":80,"savedBytes":120}"#,
r#"{"sessionId":"s","agentId":"a","opType":"compress-response","method":"Standard","beforeTokens":500,"afterTokens":300,"savedTokens":200,"beforeBytes":2000,"afterBytes":1200,"savedBytes":800}"#,
r#"{"sessionId":"s","agentId":"a","opType":"compress-schema","method":"ToonHrv","beforeTokens":1000,"afterTokens":700,"savedTokens":300,"beforeBytes":4000,"afterBytes":2800,"savedBytes":1200}"#,
];
let content = lines.join("\n");
let mut cursor = 0u64;
let acc = parse(&content, &mut cursor).unwrap();
assert_eq!(acc.total_saved, 30 + 25 + 200 + 300);
assert_eq!(acc.rtk_saved, 30 + 25);
assert_eq!(acc.response_saved, 200);
assert_eq!(acc.schema_saved, 300);
assert_eq!(acc.project_path.as_deref(), Some("p"));
assert_eq!(acc.user_name.as_deref(), Some("u"));
}
#[test]
fn test_incremental_second_read_only_new_lines() {
let first = r#"{"sessionId":"s","agentId":"a","projectPath":"p","userName":"u","opType":"rewrite-command","method":"Rtk","beforeTokens":100,"afterTokens":50,"savedTokens":50,"beforeBytes":400,"afterBytes":200,"savedBytes":200}"#;
let mut cursor = 0u64;
let acc1 = parse(first, &mut cursor).unwrap();
assert_eq!(acc1.total_saved, 50);
let second_line = r#"{"sessionId":"s","agentId":"a","projectPath":"p","userName":"u","opType":"rewrite-command","method":"Rtk","beforeTokens":200,"afterTokens":100,"savedTokens":100,"beforeBytes":800,"afterBytes":400,"savedBytes":400}"#;
let full = format!("{first}\n{second_line}\n");
let acc2 = parse(&full, &mut cursor).unwrap();
assert_eq!(acc2.total_saved, 100);
assert_eq!(acc2.project_path.as_deref(), Some("p"));
assert_eq!(acc2.user_name.as_deref(), Some("u"));
}
#[test]
fn test_incremental_no_new_data_returns_meta() {
let first = r#"{"sessionId":"s","agentId":"a","projectPath":"p","userName":"u","opType":"rewrite-command","method":"Rtk","beforeTokens":100,"afterTokens":50,"savedTokens":50,"beforeBytes":400,"afterBytes":200,"savedBytes":200}"#;
let mut cursor = 0u64;
parse(first, &mut cursor);
let acc = parse(first, &mut cursor).unwrap();
assert_eq!(acc.total_saved, 0);
assert_eq!(acc.project_path.as_deref(), Some("p"));
assert_eq!(acc.user_name.as_deref(), Some("u"));
assert!(acc.breakdown_json.is_empty());
}
}