llm_cost_ops_api/ingestion/
models.rs

1// Ingestion domain models for Observatory integration
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use uuid::Uuid;
7use validator::Validate;
8
9use llm_cost_ops::domain::{ModelIdentifier, Provider};
10
11/// Webhook payload for usage data ingestion
12#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
13pub struct UsageWebhookPayload {
14    /// Request identifier (optional, will be generated if not provided)
15    #[serde(default = "Uuid::new_v4")]
16    pub request_id: Uuid,
17
18    /// Timestamp of the LLM request
19    pub timestamp: DateTime<Utc>,
20
21    /// Provider information
22    #[validate(length(min = 1))]
23    pub provider: String,
24
25    /// Model information
26    #[validate]
27    pub model: ModelWebhook,
28
29    /// Organization identifier
30    #[validate(length(min = 1, max = 255))]
31    pub organization_id: String,
32
33    /// Project identifier (optional)
34    #[validate(length(max = 255))]
35    pub project_id: Option<String>,
36
37    /// User identifier (optional)
38    #[validate(length(max = 255))]
39    pub user_id: Option<String>,
40
41    /// Token usage information
42    #[validate]
43    pub usage: TokenUsageWebhook,
44
45    /// Performance metrics (optional)
46    pub performance: Option<PerformanceMetrics>,
47
48    /// Custom tags for categorization
49    #[serde(default)]
50    pub tags: Vec<String>,
51
52    /// Additional metadata
53    #[serde(default)]
54    pub metadata: HashMap<String, serde_json::Value>,
55}
56
57/// Model information in webhook payload
58#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
59pub struct ModelWebhook {
60    #[validate(length(min = 1, max = 255))]
61    pub name: String,
62
63    #[validate(length(max = 100))]
64    pub version: Option<String>,
65
66    #[validate(range(min = 1))]
67    pub context_window: Option<u64>,
68}
69
70/// Token usage information
71#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
72pub struct TokenUsageWebhook {
73    #[validate(range(min = 0))]
74    pub prompt_tokens: u64,
75
76    #[validate(range(min = 0))]
77    pub completion_tokens: u64,
78
79    #[validate(range(min = 0))]
80    pub total_tokens: u64,
81
82    #[validate(range(min = 0))]
83    pub cached_tokens: Option<u64>,
84
85    #[validate(range(min = 0))]
86    pub reasoning_tokens: Option<u64>,
87}
88
89/// Performance metrics
90#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
91pub struct PerformanceMetrics {
92    /// Total request latency in milliseconds
93    #[validate(range(min = 0))]
94    pub latency_ms: Option<u64>,
95
96    /// Time to first token in milliseconds
97    #[validate(range(min = 0))]
98    pub time_to_first_token_ms: Option<u64>,
99}
100
101/// Batch ingestion request for multiple usage records
102#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
103pub struct BatchIngestionRequest {
104    /// Batch identifier
105    #[serde(default = "Uuid::new_v4")]
106    pub batch_id: Uuid,
107
108    /// Source of the batch
109    #[validate(length(min = 1, max = 255))]
110    pub source: String,
111
112    /// Records to ingest
113    #[validate(length(min = 1, max = 1000))]
114    #[validate]
115    pub records: Vec<UsageWebhookPayload>,
116}
117
118/// Ingestion response
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct IngestionResponse {
121    /// Request identifier
122    pub request_id: Uuid,
123
124    /// Status of the ingestion
125    pub status: IngestionStatus,
126
127    /// Number of records accepted
128    pub accepted: usize,
129
130    /// Number of records rejected
131    pub rejected: usize,
132
133    /// Validation errors (if any)
134    #[serde(skip_serializing_if = "Vec::is_empty")]
135    pub errors: Vec<IngestionError>,
136
137    /// Processing timestamp
138    pub processed_at: DateTime<Utc>,
139}
140
141/// Ingestion status
142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(rename_all = "lowercase")]
144pub enum IngestionStatus {
145    /// All records accepted
146    Success,
147
148    /// Some records accepted, some rejected
149    Partial,
150
151    /// All records rejected
152    Failed,
153
154    /// Request queued for processing
155    Queued,
156}
157
158/// Ingestion error details
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct IngestionError {
161    /// Record index (for batch requests)
162    pub index: Option<usize>,
163
164    /// Error code
165    pub code: String,
166
167    /// Error message
168    pub message: String,
169
170    /// Field that caused the error (if applicable)
171    pub field: Option<String>,
172}
173
174/// Event stream message envelope
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct StreamMessage {
177    /// Message identifier
178    pub message_id: String,
179
180    /// Event type
181    pub event_type: StreamEventType,
182
183    /// Timestamp when message was created
184    pub created_at: DateTime<Utc>,
185
186    /// Payload data
187    pub payload: UsageWebhookPayload,
188
189    /// Retry count
190    #[serde(default)]
191    pub retry_count: u32,
192}
193
194/// Stream event types
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
196#[serde(rename_all = "snake_case")]
197pub enum StreamEventType {
198    /// New usage record
199    UsageCreated,
200
201    /// Usage record updated
202    UsageUpdated,
203
204    /// Batch upload
205    BatchUploaded,
206}
207
208/// Configuration for ingestion sources
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct IngestionConfig {
211    /// Enable webhook endpoint
212    pub webhook_enabled: bool,
213
214    /// Webhook server bind address
215    pub webhook_bind: String,
216
217    /// Enable NATS stream consumer
218    pub nats_enabled: bool,
219
220    /// NATS server URLs
221    pub nats_urls: Vec<String>,
222
223    /// NATS subject to subscribe to
224    pub nats_subject: String,
225
226    /// Enable Redis stream consumer
227    pub redis_enabled: bool,
228
229    /// Redis connection URL
230    pub redis_url: Option<String>,
231
232    /// Redis stream key
233    pub redis_stream_key: String,
234
235    /// Buffer size for incoming requests
236    pub buffer_size: usize,
237
238    /// Maximum batch size
239    pub max_batch_size: usize,
240
241    /// Request timeout in seconds
242    pub request_timeout_secs: u64,
243
244    /// Retry configuration
245    pub retry: RetryConfig,
246}
247
248/// Retry configuration
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct RetryConfig {
251    /// Maximum number of retries
252    pub max_retries: u32,
253
254    /// Initial retry delay in milliseconds
255    pub initial_delay_ms: u64,
256
257    /// Maximum retry delay in milliseconds
258    pub max_delay_ms: u64,
259
260    /// Backoff multiplier
261    pub backoff_multiplier: f64,
262}
263
264impl Default for IngestionConfig {
265    fn default() -> Self {
266        Self {
267            webhook_enabled: true,
268            webhook_bind: "0.0.0.0:8080".to_string(),
269            nats_enabled: false,
270            nats_urls: vec!["nats://localhost:4222".to_string()],
271            nats_subject: "llm.usage".to_string(),
272            redis_enabled: false,
273            redis_url: None,
274            redis_stream_key: "llm:usage".to_string(),
275            buffer_size: 10000,
276            max_batch_size: 1000,
277            request_timeout_secs: 30,
278            retry: RetryConfig::default(),
279        }
280    }
281}
282
283impl Default for RetryConfig {
284    fn default() -> Self {
285        Self {
286            max_retries: 3,
287            initial_delay_ms: 100,
288            max_delay_ms: 30000,
289            backoff_multiplier: 2.0,
290        }
291    }
292}
293
294impl UsageWebhookPayload {
295    /// Convert webhook payload to domain UsageRecord
296    pub fn to_usage_record(&self) -> llm_cost_ops::UsageRecord {
297        use llm_cost_ops::domain::IngestionSource;
298
299        llm_cost_ops::UsageRecord {
300            id: self.request_id,
301            timestamp: self.timestamp,
302            provider: Provider::parse(&self.provider),
303            model: ModelIdentifier {
304                name: self.model.name.clone(),
305                version: self.model.version.clone(),
306                context_window: self.model.context_window,
307            },
308            organization_id: self.organization_id.clone(),
309            project_id: self.project_id.clone(),
310            user_id: self.user_id.clone(),
311            prompt_tokens: self.usage.prompt_tokens,
312            completion_tokens: self.usage.completion_tokens,
313            total_tokens: self.usage.total_tokens,
314            cached_tokens: self.usage.cached_tokens,
315            reasoning_tokens: self.usage.reasoning_tokens,
316            latency_ms: self.performance.as_ref().and_then(|p| p.latency_ms),
317            time_to_first_token_ms: self
318                .performance
319                .as_ref()
320                .and_then(|p| p.time_to_first_token_ms),
321            tags: self.tags.clone(),
322            metadata: serde_json::to_value(&self.metadata).unwrap_or(serde_json::json!({})),
323            ingested_at: Utc::now(),
324            source: IngestionSource::Webhook {
325                endpoint: "/v1/usage".to_string(),
326            },
327        }
328    }
329}