llm_cost_ops_api/ingestion/
handler.rs1use async_trait::async_trait;
4use chrono::Utc;
5use tracing::{error, info, warn};
6use uuid::Uuid;
7use validator::Validate;
8
9use llm_cost_ops::{Result, UsageRecord};
10use llm_cost_ops::storage::UsageRepository;
11
12use super::models::{
13 IngestionError, IngestionResponse, IngestionStatus, UsageWebhookPayload,
14};
15use super::traits::{IngestionHandler, IngestionStorage, PayloadValidator};
16
17#[derive(Clone)]
19pub struct DefaultIngestionHandler<S: UsageRepository + Clone> {
20 storage: S,
21}
22
23impl<S: UsageRepository + Clone> DefaultIngestionHandler<S> {
24 pub fn new(storage: S) -> Self {
25 Self { storage }
26 }
27
28 fn process_payload(&self, payload: &UsageWebhookPayload) -> Result<UsageRecord> {
30 let validator = DefaultPayloadValidator;
31 validator.validate(payload)?;
33
34 Ok(payload.to_usage_record())
36 }
37}
38
39#[async_trait]
40impl<S: UsageRepository + Clone> IngestionHandler for DefaultIngestionHandler<S> {
41 async fn handle_single(&self, payload: UsageWebhookPayload) -> Result<IngestionResponse> {
42 let request_id = payload.request_id;
43
44 info!(
45 request_id = %request_id,
46 organization_id = %payload.organization_id,
47 provider = %payload.provider,
48 "Processing single usage record"
49 );
50
51 match self.process_payload(&payload) {
53 Ok(record) => {
54 match self.storage.create(&record).await {
56 Ok(_) => {
57 info!(
58 request_id = %request_id,
59 "Successfully ingested usage record"
60 );
61
62 Ok(IngestionResponse {
63 request_id,
64 status: IngestionStatus::Success,
65 accepted: 1,
66 rejected: 0,
67 errors: vec![],
68 processed_at: Utc::now(),
69 })
70 }
71 Err(e) => {
72 error!(
73 request_id = %request_id,
74 error = %e,
75 "Failed to store usage record"
76 );
77
78 Ok(IngestionResponse {
79 request_id,
80 status: IngestionStatus::Failed,
81 accepted: 0,
82 rejected: 1,
83 errors: vec![IngestionError {
84 index: None,
85 code: "STORAGE_ERROR".to_string(),
86 message: e.to_string(),
87 field: None,
88 }],
89 processed_at: Utc::now(),
90 })
91 }
92 }
93 }
94 Err(e) => {
95 warn!(
96 request_id = %request_id,
97 error = %e,
98 "Validation failed for usage record"
99 );
100
101 Ok(IngestionResponse {
102 request_id,
103 status: IngestionStatus::Failed,
104 accepted: 0,
105 rejected: 1,
106 errors: vec![IngestionError {
107 index: None,
108 code: "VALIDATION_ERROR".to_string(),
109 message: e.to_string(),
110 field: None,
111 }],
112 processed_at: Utc::now(),
113 })
114 }
115 }
116 }
117
118 async fn handle_batch(
119 &self,
120 payloads: Vec<UsageWebhookPayload>,
121 ) -> Result<IngestionResponse> {
122 let batch_id = Uuid::new_v4();
123 let batch_size = payloads.len();
124
125 info!(
126 batch_id = %batch_id,
127 batch_size = batch_size,
128 "Processing batch ingestion request"
129 );
130
131 let mut accepted = 0;
132 let mut rejected = 0;
133 let mut errors = Vec::new();
134
135 for (index, payload) in payloads.into_iter().enumerate() {
136 match self.process_payload(&payload) {
137 Ok(record) => match self.storage.create(&record).await {
138 Ok(_) => {
139 accepted += 1;
140 }
141 Err(e) => {
142 rejected += 1;
143 errors.push(IngestionError {
144 index: Some(index),
145 code: "STORAGE_ERROR".to_string(),
146 message: e.to_string(),
147 field: None,
148 });
149 }
150 },
151 Err(e) => {
152 rejected += 1;
153 errors.push(IngestionError {
154 index: Some(index),
155 code: "VALIDATION_ERROR".to_string(),
156 message: e.to_string(),
157 field: None,
158 });
159 }
160 }
161 }
162
163 let status = if accepted == batch_size {
164 IngestionStatus::Success
165 } else if accepted > 0 {
166 IngestionStatus::Partial
167 } else {
168 IngestionStatus::Failed
169 };
170
171 info!(
172 batch_id = %batch_id,
173 status = ?status,
174 accepted = accepted,
175 rejected = rejected,
176 "Batch ingestion completed"
177 );
178
179 Ok(IngestionResponse {
180 request_id: batch_id,
181 status,
182 accepted,
183 rejected,
184 errors,
185 processed_at: Utc::now(),
186 })
187 }
188
189 fn name(&self) -> &str {
190 "default_ingestion_handler"
191 }
192
193 async fn health_check(&self) -> Result<bool> {
194 Ok(true)
196 }
197}
198
199struct DefaultPayloadValidator;
201
202impl PayloadValidator for DefaultPayloadValidator {
203 fn validate(&self, payload: &UsageWebhookPayload) -> Result<()> {
204 use llm_cost_ops::CostOpsError;
205
206 payload.validate().map_err(|e| {
207 CostOpsError::Validation(format!("Validation failed: {}", e))
208 })?;
209
210 if payload.usage.total_tokens
212 != payload.usage.prompt_tokens + payload.usage.completion_tokens
213 {
214 return Err(CostOpsError::TokenCountMismatch {
215 calculated: payload.usage.prompt_tokens + payload.usage.completion_tokens,
216 reported: payload.usage.total_tokens,
217 });
218 }
219
220 if let Some(cached) = payload.usage.cached_tokens {
221 if cached > payload.usage.prompt_tokens {
222 return Err(CostOpsError::Validation(format!(
223 "Cached tokens ({}) cannot exceed prompt tokens ({})",
224 cached,
225 payload.usage.prompt_tokens
226 )));
227 }
228 }
229
230 Ok(())
231 }
232
233 fn validate_batch(&self, payloads: &[UsageWebhookPayload]) -> Vec<Result<()>> {
234 payloads.iter().map(|p| self.validate(p)).collect()
235 }
236}
237
238pub struct StorageAdapter<R: UsageRepository> {
240 repository: R,
241}
242
243impl<R: UsageRepository> StorageAdapter<R> {
244 pub fn new(repository: R) -> Self {
245 Self { repository }
246 }
247}
248
249#[async_trait]
250impl<R: UsageRepository> IngestionStorage for StorageAdapter<R> {
251 async fn store_usage(&self, record: UsageRecord) -> Result<()> {
252 self.repository.create(&record).await
253 }
254
255 async fn store_batch(&self, records: Vec<UsageRecord>) -> Result<Vec<Result<()>>> {
256 let mut results = Vec::new();
257 for record in records {
258 results.push(self.repository.create(&record).await);
259 }
260 Ok(results)
261 }
262
263 async fn health(&self) -> Result<bool> {
264 Ok(true)
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::ingestion::models::TokenUsageWebhook;
273 use chrono::Utc;
274
275 fn create_test_payload() -> UsageWebhookPayload {
276 UsageWebhookPayload {
277 request_id: Uuid::new_v4(),
278 timestamp: Utc::now(),
279 provider: "openai".to_string(),
280 model: super::super::models::ModelWebhook {
281 name: "gpt-4".to_string(),
282 version: None,
283 context_window: Some(8192),
284 },
285 organization_id: "org-test".to_string(),
286 project_id: Some("proj-test".to_string()),
287 user_id: None,
288 usage: TokenUsageWebhook {
289 prompt_tokens: 100,
290 completion_tokens: 50,
291 total_tokens: 150,
292 cached_tokens: None,
293 reasoning_tokens: None,
294 },
295 performance: None,
296 tags: vec![],
297 metadata: Default::default(),
298 }
299 }
300
301 #[test]
302 fn test_payload_validation_success() {
303 let validator = DefaultPayloadValidator;
304 let payload = create_test_payload();
305
306 assert!(validator.validate(&payload).is_ok());
307 }
308
309 #[test]
310 fn test_payload_validation_token_mismatch() {
311 let validator = DefaultPayloadValidator;
312 let mut payload = create_test_payload();
313 payload.usage.total_tokens = 999; assert!(validator.validate(&payload).is_err());
316 }
317
318 #[test]
319 fn test_payload_validation_cached_tokens_exceed_prompt() {
320 let validator = DefaultPayloadValidator;
321 let mut payload = create_test_payload();
322 payload.usage.cached_tokens = Some(200); assert!(validator.validate(&payload).is_err());
325 }
326}