use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use serde::Serialize;
use crate::msg::LlmEvent;
use crate::request::Provider;
use crate::types::UsageStats;
use super::fallback::CommittedUpstream;
#[derive(Debug, Serialize, Clone)]
pub struct UsageRecord {
pub ts: String,
pub auth_token: Option<String>,
pub user: Option<String>,
pub wire_format: &'static str,
pub model: String,
pub upstream_provider: Option<String>,
pub upstream_model: Option<String>,
pub input_tokens: usize,
pub output_tokens: usize,
pub cache_read_tokens: usize,
pub cache_creation_tokens: usize,
pub reasoning_tokens: usize,
pub duration_ms: u64,
pub status: &'static str,
pub error: Option<String>,
pub streaming: bool,
}
impl UsageRecord {
pub fn builder(wire_format: &'static str, model: impl Into<String>) -> UsageRecordBuilder {
UsageRecordBuilder {
wire_format,
model: model.into(),
auth_token: None,
user: None,
upstream_provider: None,
upstream_model: None,
usage: UsageStats::default(),
duration_ms: 0,
status: "ok",
error: None,
streaming: false,
}
}
}
#[derive(Debug)]
pub struct UsageRecordBuilder {
wire_format: &'static str,
model: String,
auth_token: Option<String>,
user: Option<String>,
upstream_provider: Option<Provider>,
upstream_model: Option<String>,
usage: UsageStats,
duration_ms: u64,
status: &'static str,
error: Option<String>,
streaming: bool,
}
impl UsageRecordBuilder {
pub fn auth_token(mut self, t: Option<String>) -> Self {
self.auth_token = t;
self
}
pub fn user(mut self, u: Option<String>) -> Self {
self.user = u;
self
}
pub fn upstream(mut self, provider: Provider, model: impl Into<String>) -> Self {
self.upstream_provider = Some(provider);
self.upstream_model = Some(model.into());
self
}
pub fn usage(mut self, u: UsageStats) -> Self {
self.usage = u;
self
}
pub fn duration_ms(mut self, ms: u64) -> Self {
self.duration_ms = ms;
self
}
pub fn streaming(mut self, s: bool) -> Self {
self.streaming = s;
self
}
pub fn ok(mut self) -> Self {
self.status = "ok";
self.error = None;
self
}
pub fn error(mut self, msg: impl Into<String>) -> Self {
self.status = "error";
self.error = Some(msg.into());
self
}
pub fn build(self) -> UsageRecord {
UsageRecord {
ts: rfc3339_now(),
auth_token: self.auth_token,
user: self.user,
wire_format: self.wire_format,
model: self.model,
upstream_provider: self.upstream_provider.map(format_provider),
upstream_model: self.upstream_model,
input_tokens: self.usage.prompt_tokens,
output_tokens: self.usage.completion_tokens,
cache_read_tokens: self.usage.cache_read_tokens,
cache_creation_tokens: self.usage.cache_creation_tokens,
reasoning_tokens: self.usage.reasoning_tokens,
duration_ms: self.duration_ms,
status: self.status,
error: self.error,
streaming: self.streaming,
}
}
}
fn format_provider(p: Provider) -> String {
format!("{:?}", p)
}
#[derive(Debug)]
pub struct UsageLogger {
inner: Mutex<Inner>,
}
#[derive(Debug)]
struct Inner {
writer: BufWriter<std::fs::File>,
flush_each: bool,
}
impl UsageLogger {
pub fn open(path: impl Into<PathBuf>, flush_each: bool) -> std::io::Result<Self> {
let path = path.into();
let file = OpenOptions::new().create(true).append(true).open(&path)?;
Ok(Self {
inner: Mutex::new(Inner {
writer: BufWriter::new(file),
flush_each,
}),
})
}
pub fn log(&self, record: &UsageRecord) {
let Ok(mut g) = self.inner.lock() else {
return;
};
if let Ok(line) = serde_json::to_string(record) {
let _ = g.writer.write_all(line.as_bytes());
let _ = g.writer.write_all(b"\n");
if g.flush_each {
let _ = g.writer.flush();
}
}
}
pub fn flush(&self) {
if let Ok(mut g) = self.inner.lock() {
let _ = g.writer.flush();
}
}
}
impl Drop for UsageLogger {
fn drop(&mut self) {
self.flush();
}
}
pub fn parse_bearer_token(header: Option<&str>) -> Option<String> {
let raw = header?.trim();
let token = raw
.strip_prefix("Bearer ")
.or_else(|| raw.strip_prefix("bearer "))
.unwrap_or(raw);
if token.is_empty() {
None
} else {
Some(token.to_string())
}
}
#[derive(Debug, Clone)]
pub struct AuthedUser {
pub token: String,
pub user: String,
}
pub fn extract_client_token(headers: &axum::http::HeaderMap) -> Option<String> {
if let Some(t) = parse_bearer_token(
headers
.get(axum::http::header::AUTHORIZATION)
.and_then(|v| v.to_str().ok()),
) {
return Some(t);
}
let xapi = headers.get("x-api-key").and_then(|v| v.to_str().ok())?;
let trimmed = xapi.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
#[derive(Debug)]
pub struct UsageTracker {
logger: Option<Arc<UsageLogger>>,
started_at: Instant,
wire_format: &'static str,
model: String,
auth_token: Option<String>,
user: Option<String>,
streaming: bool,
committed: Option<CommittedUpstream>,
last_usage: UsageStats,
status: &'static str,
error: Option<String>,
}
impl UsageTracker {
pub fn new(
logger: Option<Arc<UsageLogger>>,
wire_format: &'static str,
model: impl Into<String>,
auth_token: Option<String>,
streaming: bool,
) -> Self {
Self {
logger,
started_at: Instant::now(),
wire_format,
model: model.into(),
auth_token,
user: None,
streaming,
committed: None,
last_usage: UsageStats::default(),
status: "ok",
error: None,
}
}
pub fn set_user(&mut self, user: Option<String>) {
self.user = user;
}
pub fn set_committed(&mut self, c: CommittedUpstream) {
self.committed = Some(c);
}
pub fn set_usage(&mut self, u: UsageStats) {
self.last_usage = u;
}
pub fn observe(&mut self, ev: &LlmEvent) -> bool {
match ev {
LlmEvent::Usage(u) => {
self.last_usage = u.clone();
false
}
LlmEvent::Done => true,
LlmEvent::Error(e) => {
self.status = "error";
self.error = Some(e.clone());
true
}
_ => false,
}
}
pub fn mark_error(&mut self, msg: impl Into<String>) {
self.status = "error";
self.error = Some(msg.into());
}
pub fn finalize(self) {
let Some(logger) = self.logger else {
return;
};
let duration_ms = self.started_at.elapsed().as_millis() as u64;
let mut builder = UsageRecord::builder(self.wire_format, self.model)
.auth_token(self.auth_token)
.user(self.user)
.usage(self.last_usage)
.duration_ms(duration_ms)
.streaming(self.streaming);
if let Some(c) = self.committed {
builder = builder.upstream(c.provider, c.model);
}
let rec = if self.status == "error" {
let msg = self.error.unwrap_or_else(|| "unknown error".to_string());
builder.error(msg).build()
} else {
builder.ok().build()
};
logger.log(&rec);
}
}
fn rfc3339_now() -> String {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = now.as_secs() as i64;
let millis = now.subsec_millis();
let days = secs.div_euclid(86_400);
let sec_of_day = secs.rem_euclid(86_400);
let (h, m, s) = (
(sec_of_day / 3600) as u32,
((sec_of_day / 60) % 60) as u32,
(sec_of_day % 60) as u32,
);
let (year, month, day) = days_to_ymd(days);
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
year, month, day, h, m, s, millis
)
}
fn days_to_ymd(days_since_epoch: i64) -> (i32, u32, u32) {
let z = days_since_epoch + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = (z - era * 146_097) as u64; let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = (doy - (153 * mp + 2) / 5 + 1) as u32; let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32; let y = if m <= 2 { y + 1 } else { y } as i32;
(y, m, d)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ymd_known_dates() {
assert_eq!(days_to_ymd(0), (1970, 1, 1));
assert_eq!(days_to_ymd(10957), (2000, 1, 1));
assert_eq!(days_to_ymd(19782), (2024, 2, 29));
}
#[test]
fn rfc3339_format() {
let s = rfc3339_now();
assert!(s.len() >= 24, "got {s}");
assert!(s.ends_with('Z'), "got {s}");
assert_eq!(&s[4..5], "-");
}
#[test]
fn logger_writes_jsonl() {
let tmp = tempfile_path();
let logger = UsageLogger::open(&tmp, true).unwrap();
let rec = UsageRecord::builder("anthropic", "claude-sonnet-4-5")
.auth_token(Some("sk-relay-test".into()))
.upstream(Provider::Anthropic, "claude-sonnet-4-5")
.usage(UsageStats {
prompt_tokens: 100,
completion_tokens: 50,
total_tokens: 150,
..Default::default()
})
.duration_ms(2000)
.streaming(true)
.ok()
.build();
logger.log(&rec);
logger.flush();
drop(logger);
let content = std::fs::read_to_string(&tmp).unwrap();
let _ = std::fs::remove_file(&tmp);
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 1);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["auth_token"], "sk-relay-test");
assert_eq!(parsed["wire_format"], "anthropic");
assert_eq!(parsed["input_tokens"], 100);
assert_eq!(parsed["output_tokens"], 50);
assert_eq!(parsed["status"], "ok");
assert_eq!(parsed["streaming"], true);
assert_eq!(parsed["upstream_provider"], "Anthropic");
}
#[test]
fn logger_records_error_status() {
let tmp = tempfile_path();
let logger = UsageLogger::open(&tmp, true).unwrap();
let rec = UsageRecord::builder("openai_chat", "x")
.error("upstream 503")
.build();
logger.log(&rec);
drop(logger);
let content = std::fs::read_to_string(&tmp).unwrap();
let _ = std::fs::remove_file(&tmp);
let parsed: serde_json::Value = serde_json::from_str(content.trim()).unwrap();
assert_eq!(parsed["status"], "error");
assert_eq!(parsed["error"], "upstream 503");
}
fn tempfile_path() -> PathBuf {
let dir = std::env::temp_dir();
let name = format!(
"agentix_usage_test_{}.jsonl",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
dir.join(name)
}
}