Skip to main content

chainlink/db/
token_usage_db.rs

1use anyhow::Result;
2use chrono::Utc;
3use rusqlite::params;
4
5use super::{parse_datetime, Database};
6use crate::models::TokenUsage;
7use crate::token_usage::{ParsedUsage, UsageSummaryRow};
8
9impl Database {
10    pub fn create_token_usage(&self, usage: &ParsedUsage) -> Result<i64> {
11        let now = Utc::now().to_rfc3339();
12        self.conn.execute(
13            "INSERT INTO token_usage (agent_id, session_id, timestamp, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, cost_estimate)
14             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
15            params![
16                usage.agent_id,
17                usage.session_id,
18                now,
19                usage.input_tokens,
20                usage.output_tokens,
21                usage.cache_read_tokens,
22                usage.cache_creation_tokens,
23                usage.model,
24                usage.cost_estimate
25            ],
26        )?;
27        Ok(self.conn.last_insert_rowid())
28    }
29
30    pub fn get_token_usage(&self, id: i64) -> Result<Option<TokenUsage>> {
31        let mut stmt = self.conn.prepare(
32            "SELECT id, agent_id, session_id, timestamp, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, cost_estimate
33             FROM token_usage WHERE id = ?1",
34        )?;
35        let mut rows = stmt.query_map([id], |row| {
36            Ok(TokenUsage {
37                id: row.get(0)?,
38                agent_id: row.get(1)?,
39                session_id: row.get(2)?,
40                timestamp: parse_datetime(row.get::<_, String>(3)?),
41                input_tokens: row.get(4)?,
42                output_tokens: row.get(5)?,
43                cache_read_tokens: row.get(6)?,
44                cache_creation_tokens: row.get(7)?,
45                model: row.get(8)?,
46                cost_estimate: row.get(9)?,
47            })
48        })?;
49        match rows.next() {
50            Some(Ok(usage)) => Ok(Some(usage)),
51            Some(Err(e)) => Err(e.into()),
52            None => Ok(None),
53        }
54    }
55
56    pub fn list_token_usage(
57        &self,
58        agent_id: Option<&str>,
59        session_id: Option<i64>,
60        model: Option<&str>,
61        from: Option<&str>,
62        to: Option<&str>,
63        limit: Option<i64>,
64    ) -> Result<Vec<TokenUsage>> {
65        let mut conditions = Vec::new();
66        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
67
68        if let Some(aid) = agent_id {
69            conditions.push(format!("agent_id = ?{}", param_values.len() + 1));
70            param_values.push(Box::new(aid.to_string()));
71        }
72        if let Some(sid) = session_id {
73            conditions.push(format!("session_id = ?{}", param_values.len() + 1));
74            param_values.push(Box::new(sid));
75        }
76        if let Some(m) = model {
77            conditions.push(format!("model = ?{}", param_values.len() + 1));
78            param_values.push(Box::new(m.to_string()));
79        }
80        if let Some(f) = from {
81            conditions.push(format!("timestamp >= ?{}", param_values.len() + 1));
82            param_values.push(Box::new(f.to_string()));
83        }
84        if let Some(t) = to {
85            conditions.push(format!("timestamp <= ?{}", param_values.len() + 1));
86            param_values.push(Box::new(t.to_string()));
87        }
88
89        let where_clause = if conditions.is_empty() {
90            String::new()
91        } else {
92            format!(" WHERE {}", conditions.join(" AND "))
93        };
94
95        let limit_clause = match limit {
96            Some(l) => format!(" LIMIT {}", l),
97            None => String::new(),
98        };
99
100        let sql = format!(
101            "SELECT id, agent_id, session_id, timestamp, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, model, cost_estimate
102             FROM token_usage{} ORDER BY timestamp DESC{}",
103            where_clause, limit_clause
104        );
105
106        let params_ref: Vec<&dyn rusqlite::types::ToSql> =
107            param_values.iter().map(|p| p.as_ref()).collect();
108        let mut stmt = self.conn.prepare(&sql)?;
109        let results = stmt
110            .query_map(params_ref.as_slice(), |row| {
111                Ok(TokenUsage {
112                    id: row.get(0)?,
113                    agent_id: row.get(1)?,
114                    session_id: row.get(2)?,
115                    timestamp: parse_datetime(row.get::<_, String>(3)?),
116                    input_tokens: row.get(4)?,
117                    output_tokens: row.get(5)?,
118                    cache_read_tokens: row.get(6)?,
119                    cache_creation_tokens: row.get(7)?,
120                    model: row.get(8)?,
121                    cost_estimate: row.get(9)?,
122                })
123            })?
124            .collect::<std::result::Result<Vec<_>, _>>()?;
125        Ok(results)
126    }
127
128    pub fn get_usage_summary(
129        &self,
130        agent_id: Option<&str>,
131        from: Option<&str>,
132        to: Option<&str>,
133    ) -> Result<Vec<UsageSummaryRow>> {
134        let mut conditions = Vec::new();
135        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
136
137        if let Some(aid) = agent_id {
138            conditions.push(format!("agent_id = ?{}", param_values.len() + 1));
139            param_values.push(Box::new(aid.to_string()));
140        }
141        if let Some(f) = from {
142            conditions.push(format!("timestamp >= ?{}", param_values.len() + 1));
143            param_values.push(Box::new(f.to_string()));
144        }
145        if let Some(t) = to {
146            conditions.push(format!("timestamp <= ?{}", param_values.len() + 1));
147            param_values.push(Box::new(t.to_string()));
148        }
149
150        let where_clause = if conditions.is_empty() {
151            String::new()
152        } else {
153            format!(" WHERE {}", conditions.join(" AND "))
154        };
155
156        let sql = format!(
157            "SELECT agent_id, model, COUNT(*) as request_count,
158                    SUM(input_tokens) as total_input,
159                    SUM(output_tokens) as total_output,
160                    COALESCE(SUM(cache_read_tokens), 0) as total_cache_read,
161                    COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation,
162                    COALESCE(SUM(cost_estimate), 0.0) as total_cost
163             FROM token_usage{}
164             GROUP BY agent_id, model
165             ORDER BY total_cost DESC",
166            where_clause
167        );
168
169        let params_ref: Vec<&dyn rusqlite::types::ToSql> =
170            param_values.iter().map(|p| p.as_ref()).collect();
171        let mut stmt = self.conn.prepare(&sql)?;
172        let results = stmt
173            .query_map(params_ref.as_slice(), |row| {
174                Ok(UsageSummaryRow {
175                    agent_id: row.get(0)?,
176                    model: row.get(1)?,
177                    request_count: row.get(2)?,
178                    total_input_tokens: row.get(3)?,
179                    total_output_tokens: row.get(4)?,
180                    total_cache_read_tokens: row.get(5)?,
181                    total_cache_creation_tokens: row.get(6)?,
182                    total_cost: row.get(7)?,
183                })
184            })?
185            .collect::<std::result::Result<Vec<_>, _>>()?;
186        Ok(results)
187    }
188}