use std::collections::HashMap;
use std::sync::RwLock;
use super::counter::UsageEvent;
pub struct UsageStore {
events: RwLock<Vec<UsageEvent>>,
max_events: usize,
user_totals: RwLock<HashMap<String, u64>>,
org_totals: RwLock<HashMap<String, u64>>,
tenant_totals: RwLock<HashMap<u64, TenantUsageSummary>>,
}
impl UsageStore {
pub fn new(max_events: usize) -> Self {
Self {
events: RwLock::new(Vec::with_capacity(max_events.min(100_000))),
max_events,
user_totals: RwLock::new(HashMap::new()),
org_totals: RwLock::new(HashMap::new()),
tenant_totals: RwLock::new(HashMap::new()),
}
}
pub fn ingest(&self, events: Vec<UsageEvent>) {
{
let mut user_totals = self.user_totals.write().unwrap_or_else(|p| p.into_inner());
let mut org_totals = self.org_totals.write().unwrap_or_else(|p| p.into_inner());
for e in &events {
*user_totals.entry(e.auth_user_id.clone()).or_insert(0) += e.tokens;
if !e.org_id.is_empty() {
*org_totals.entry(e.org_id.clone()).or_insert(0) += e.tokens;
}
}
}
{
let mut tenant_totals = self
.tenant_totals
.write()
.unwrap_or_else(|p| p.into_inner());
for e in &events {
let summary = tenant_totals.entry(e.tenant_id).or_default();
accumulate_event(summary, e);
}
}
let mut stored = self.events.write().unwrap_or_else(|p| p.into_inner());
for e in events {
if stored.len() >= self.max_events {
stored.remove(0); }
stored.push(e);
}
}
pub fn user_total(&self, user_id: &str) -> u64 {
let totals = self.user_totals.read().unwrap_or_else(|p| p.into_inner());
*totals.get(user_id).unwrap_or(&0)
}
pub fn org_total(&self, org_id: &str) -> u64 {
let totals = self.org_totals.read().unwrap_or_else(|p| p.into_inner());
*totals.get(org_id).unwrap_or(&0)
}
pub fn query(
&self,
user_filter: Option<&str>,
org_filter: Option<&str>,
since_secs: u64,
) -> Vec<UsageEvent> {
let events = self.events.read().unwrap_or_else(|p| p.into_inner());
events
.iter()
.filter(|e| {
let user_ok = user_filter.is_none_or(|u| e.auth_user_id == u);
let org_ok = org_filter.is_none_or(|o| e.org_id == o);
let time_ok = since_secs == 0 || e.timestamp_secs >= since_secs;
user_ok && org_ok && time_ok
})
.cloned()
.collect()
}
pub fn export_ndjson(&self, user_filter: Option<&str>, since_secs: u64) -> String {
let events = self.query(user_filter, None, since_secs);
events
.iter()
.map(|e| {
serde_json::json!({
"auth_user_id": e.auth_user_id,
"org_id": e.org_id,
"tenant_id": e.tenant_id,
"collection": e.collection,
"engine": e.engine,
"operation": e.operation,
"tokens": e.tokens,
"timestamp": e.timestamp_secs,
})
.to_string()
})
.collect::<Vec<_>>()
.join("\n")
}
pub fn query_by_tenant(&self, tenant_id: u64, since_secs: u64) -> Vec<UsageEvent> {
let events = self.events.read().unwrap_or_else(|p| p.into_inner());
events
.iter()
.filter(|e| {
e.tenant_id == tenant_id && (since_secs == 0 || e.timestamp_secs >= since_secs)
})
.cloned()
.collect()
}
pub fn export_tenant_json(&self, tenant_id: u64, since_secs: u64) -> String {
let events = self.query_by_tenant(tenant_id, since_secs);
let mut summary = TenantUsageSummary::default();
for e in &events {
accumulate_event(&mut summary, e);
}
serde_json::json!({
"tenant_id": tenant_id,
"reads": { "count": summary.reads_count, "tokens": summary.reads_tokens },
"writes": { "count": summary.writes_count, "tokens": summary.writes_tokens },
"vector_searches": summary.vector_searches,
"graph_traversals": summary.graph_traversals,
"total_events": summary.total_events,
})
.to_string()
}
pub fn aggregate_by_tenant(&self) -> HashMap<u64, TenantUsageSummary> {
let events = self.events.read().unwrap_or_else(|p| p.into_inner());
let mut summaries: HashMap<u64, TenantUsageSummary> = HashMap::new();
for e in events.iter() {
let summary = summaries.entry(e.tenant_id).or_default();
accumulate_event(summary, e);
}
summaries
}
pub fn tenant_summary(&self, tenant_id: u64) -> Option<TenantUsageSummary> {
let totals = self.tenant_totals.read().unwrap_or_else(|p| p.into_inner());
totals.get(&tenant_id).cloned()
}
pub fn count(&self) -> usize {
self.events.read().unwrap_or_else(|p| p.into_inner()).len()
}
}
fn accumulate_event(summary: &mut TenantUsageSummary, e: &UsageEvent) {
summary.total_tokens += e.tokens;
summary.total_events += 1;
if is_read_operation(&e.operation) {
summary.reads_count += 1;
summary.reads_tokens += e.tokens;
} else {
summary.writes_count += 1;
summary.writes_tokens += e.tokens;
}
if e.operation == "vector_search" {
summary.vector_searches += 1;
}
if e.engine == "graph" {
summary.graph_traversals += 1;
}
}
fn is_read_operation(operation: &str) -> bool {
matches!(
operation,
"point_get"
| "range_scan"
| "kv_get"
| "kv_scan"
| "text_search"
| "vector_search"
| "timeseries_scan"
| "columnar_scan"
)
}
#[derive(Debug, Default, Clone)]
pub struct TenantUsageSummary {
pub total_tokens: u64,
pub total_events: u64,
pub reads_count: u64,
pub reads_tokens: u64,
pub writes_count: u64,
pub writes_tokens: u64,
pub vector_searches: u64,
pub graph_traversals: u64,
}
impl Default for UsageStore {
fn default() -> Self {
Self::new(100_000)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_event(user: &str, tokens: u64) -> UsageEvent {
UsageEvent {
auth_user_id: user.into(),
org_id: "acme".into(),
tenant_id: 1,
collection: "orders".into(),
engine: "document_schemaless".into(),
operation: "point_get".into(),
tokens,
timestamp_secs: 1700000000,
}
}
#[test]
fn ingest_and_query() {
let store = UsageStore::new(1000);
store.ingest(vec![test_event("u1", 10), test_event("u2", 20)]);
assert_eq!(store.count(), 2);
assert_eq!(store.user_total("u1"), 10);
assert_eq!(store.user_total("u2"), 20);
assert_eq!(store.org_total("acme"), 30);
}
#[test]
fn query_with_filter() {
let store = UsageStore::new(1000);
store.ingest(vec![test_event("u1", 10), test_event("u2", 20)]);
let u1_events = store.query(Some("u1"), None, 0);
assert_eq!(u1_events.len(), 1);
assert_eq!(u1_events[0].tokens, 10);
}
#[test]
fn ring_buffer_drops_oldest() {
let store = UsageStore::new(2);
store.ingest(vec![
test_event("u1", 1),
test_event("u2", 2),
test_event("u3", 3),
]);
assert_eq!(store.count(), 2); }
}