llm_cost_ops_api/ingestion/
handler.rs

1// Core ingestion handler implementation
2
3use async_trait::async_trait;
4use chrono::Utc;
5use tracing::{error, info, warn};
6use uuid::Uuid;
7use validator::Validate;
8
9use llm_cost_ops::{Result, UsageRecord};
10use llm_cost_ops::storage::UsageRepository;
11
12use super::models::{
13    IngestionError, IngestionResponse, IngestionStatus, UsageWebhookPayload,
14};
15use super::traits::{IngestionHandler, IngestionStorage, PayloadValidator};
16
17/// Default ingestion handler that validates and stores usage records
18#[derive(Clone)]
19pub struct DefaultIngestionHandler<S: UsageRepository + Clone> {
20    storage: S,
21}
22
23impl<S: UsageRepository + Clone> DefaultIngestionHandler<S> {
24    pub fn new(storage: S) -> Self {
25        Self { storage }
26    }
27
28    /// Process and validate a payload
29    fn process_payload(&self, payload: &UsageWebhookPayload) -> Result<UsageRecord> {
30        let validator = DefaultPayloadValidator;
31        // Validate using validator
32        validator.validate(payload)?;
33
34        // Convert to domain record
35        Ok(payload.to_usage_record())
36    }
37}
38
39#[async_trait]
40impl<S: UsageRepository + Clone> IngestionHandler for DefaultIngestionHandler<S> {
41    async fn handle_single(&self, payload: UsageWebhookPayload) -> Result<IngestionResponse> {
42        let request_id = payload.request_id;
43
44        info!(
45            request_id = %request_id,
46            organization_id = %payload.organization_id,
47            provider = %payload.provider,
48            "Processing single usage record"
49        );
50
51        // Process and validate
52        match self.process_payload(&payload) {
53            Ok(record) => {
54                // Store in database
55                match self.storage.create(&record).await {
56                    Ok(_) => {
57                        info!(
58                            request_id = %request_id,
59                            "Successfully ingested usage record"
60                        );
61
62                        Ok(IngestionResponse {
63                            request_id,
64                            status: IngestionStatus::Success,
65                            accepted: 1,
66                            rejected: 0,
67                            errors: vec![],
68                            processed_at: Utc::now(),
69                        })
70                    }
71                    Err(e) => {
72                        error!(
73                            request_id = %request_id,
74                            error = %e,
75                            "Failed to store usage record"
76                        );
77
78                        Ok(IngestionResponse {
79                            request_id,
80                            status: IngestionStatus::Failed,
81                            accepted: 0,
82                            rejected: 1,
83                            errors: vec![IngestionError {
84                                index: None,
85                                code: "STORAGE_ERROR".to_string(),
86                                message: e.to_string(),
87                                field: None,
88                            }],
89                            processed_at: Utc::now(),
90                        })
91                    }
92                }
93            }
94            Err(e) => {
95                warn!(
96                    request_id = %request_id,
97                    error = %e,
98                    "Validation failed for usage record"
99                );
100
101                Ok(IngestionResponse {
102                    request_id,
103                    status: IngestionStatus::Failed,
104                    accepted: 0,
105                    rejected: 1,
106                    errors: vec![IngestionError {
107                        index: None,
108                        code: "VALIDATION_ERROR".to_string(),
109                        message: e.to_string(),
110                        field: None,
111                    }],
112                    processed_at: Utc::now(),
113                })
114            }
115        }
116    }
117
118    async fn handle_batch(
119        &self,
120        payloads: Vec<UsageWebhookPayload>,
121    ) -> Result<IngestionResponse> {
122        let batch_id = Uuid::new_v4();
123        let batch_size = payloads.len();
124
125        info!(
126            batch_id = %batch_id,
127            batch_size = batch_size,
128            "Processing batch ingestion request"
129        );
130
131        let mut accepted = 0;
132        let mut rejected = 0;
133        let mut errors = Vec::new();
134
135        for (index, payload) in payloads.into_iter().enumerate() {
136            match self.process_payload(&payload) {
137                Ok(record) => match self.storage.create(&record).await {
138                    Ok(_) => {
139                        accepted += 1;
140                    }
141                    Err(e) => {
142                        rejected += 1;
143                        errors.push(IngestionError {
144                            index: Some(index),
145                            code: "STORAGE_ERROR".to_string(),
146                            message: e.to_string(),
147                            field: None,
148                        });
149                    }
150                },
151                Err(e) => {
152                    rejected += 1;
153                    errors.push(IngestionError {
154                        index: Some(index),
155                        code: "VALIDATION_ERROR".to_string(),
156                        message: e.to_string(),
157                        field: None,
158                    });
159                }
160            }
161        }
162
163        let status = if accepted == batch_size {
164            IngestionStatus::Success
165        } else if accepted > 0 {
166            IngestionStatus::Partial
167        } else {
168            IngestionStatus::Failed
169        };
170
171        info!(
172            batch_id = %batch_id,
173            status = ?status,
174            accepted = accepted,
175            rejected = rejected,
176            "Batch ingestion completed"
177        );
178
179        Ok(IngestionResponse {
180            request_id: batch_id,
181            status,
182            accepted,
183            rejected,
184            errors,
185            processed_at: Utc::now(),
186        })
187    }
188
189    fn name(&self) -> &str {
190        "default_ingestion_handler"
191    }
192
193    async fn health_check(&self) -> Result<bool> {
194        // Could add more sophisticated health checks here
195        Ok(true)
196    }
197}
198
199/// Default payload validator using the validator crate
200struct DefaultPayloadValidator;
201
202impl PayloadValidator for DefaultPayloadValidator {
203    fn validate(&self, payload: &UsageWebhookPayload) -> Result<()> {
204        use llm_cost_ops::CostOpsError;
205
206        payload.validate().map_err(|e| {
207            CostOpsError::Validation(format!("Validation failed: {}", e))
208        })?;
209
210        // Additional custom validations
211        if payload.usage.total_tokens
212            != payload.usage.prompt_tokens + payload.usage.completion_tokens
213        {
214            return Err(CostOpsError::TokenCountMismatch {
215                calculated: payload.usage.prompt_tokens + payload.usage.completion_tokens,
216                reported: payload.usage.total_tokens,
217            });
218        }
219
220        if let Some(cached) = payload.usage.cached_tokens {
221            if cached > payload.usage.prompt_tokens {
222                return Err(CostOpsError::Validation(format!(
223                    "Cached tokens ({}) cannot exceed prompt tokens ({})",
224                    cached,
225                    payload.usage.prompt_tokens
226                )));
227            }
228        }
229
230        Ok(())
231    }
232
233    fn validate_batch(&self, payloads: &[UsageWebhookPayload]) -> Vec<Result<()>> {
234        payloads.iter().map(|p| self.validate(p)).collect()
235    }
236}
237
238/// Ingestion storage adapter for UsageRepository
239pub struct StorageAdapter<R: UsageRepository> {
240    repository: R,
241}
242
243impl<R: UsageRepository> StorageAdapter<R> {
244    pub fn new(repository: R) -> Self {
245        Self { repository }
246    }
247}
248
249#[async_trait]
250impl<R: UsageRepository> IngestionStorage for StorageAdapter<R> {
251    async fn store_usage(&self, record: UsageRecord) -> Result<()> {
252        self.repository.create(&record).await
253    }
254
255    async fn store_batch(&self, records: Vec<UsageRecord>) -> Result<Vec<Result<()>>> {
256        let mut results = Vec::new();
257        for record in records {
258            results.push(self.repository.create(&record).await);
259        }
260        Ok(results)
261    }
262
263    async fn health(&self) -> Result<bool> {
264        // Simple health check - could be enhanced
265        Ok(true)
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::ingestion::models::TokenUsageWebhook;
273    use chrono::Utc;
274
275    fn create_test_payload() -> UsageWebhookPayload {
276        UsageWebhookPayload {
277            request_id: Uuid::new_v4(),
278            timestamp: Utc::now(),
279            provider: "openai".to_string(),
280            model: super::super::models::ModelWebhook {
281                name: "gpt-4".to_string(),
282                version: None,
283                context_window: Some(8192),
284            },
285            organization_id: "org-test".to_string(),
286            project_id: Some("proj-test".to_string()),
287            user_id: None,
288            usage: TokenUsageWebhook {
289                prompt_tokens: 100,
290                completion_tokens: 50,
291                total_tokens: 150,
292                cached_tokens: None,
293                reasoning_tokens: None,
294            },
295            performance: None,
296            tags: vec![],
297            metadata: Default::default(),
298        }
299    }
300
301    #[test]
302    fn test_payload_validation_success() {
303        let validator = DefaultPayloadValidator;
304        let payload = create_test_payload();
305
306        assert!(validator.validate(&payload).is_ok());
307    }
308
309    #[test]
310    fn test_payload_validation_token_mismatch() {
311        let validator = DefaultPayloadValidator;
312        let mut payload = create_test_payload();
313        payload.usage.total_tokens = 999; // Mismatch
314
315        assert!(validator.validate(&payload).is_err());
316    }
317
318    #[test]
319    fn test_payload_validation_cached_tokens_exceed_prompt() {
320        let validator = DefaultPayloadValidator;
321        let mut payload = create_test_payload();
322        payload.usage.cached_tokens = Some(200); // Exceeds prompt tokens
323
324        assert!(validator.validate(&payload).is_err());
325    }
326}