latch_billing/storage.rs
1//! Storage module - defines traits for persisting observations and rated records.
2//!
3//! **Design note**: Both `ObservationStore` and `RatedRecordStore` are
4//! **sync** traits. `latch-billing` does not expose async traits.
5//!
6//! For async storage (e.g., Postgres via sqlx), downstream consumers should
7//! implement their own async buffer layer using these sync traits as the foundation.
8
9use crate::observation::UsageObservation;
10use crate::rating::RatedUsageRecord;
11
12/// Trait for appending raw usage observations.
13///
14/// Observations are **immutable facts** - they should never be updated
15/// or deleted, only appended. This is the "append-only event stream"
16/// pattern.
17///
18/// # Idempotency
19///
20/// Same `UsageEventId` repeated append should:
21/// - Be treated as success (not an error)
22/// - Side effects happen only once (no double counting)
23/// - Return `StoreResult::AlreadyExists` for callers to observe
24///
25/// # Design
26///
27/// - Sync trait: implementors should use in-memory storage or
28/// implement their own async buffer layer in the application.
29/// - Fail-open semantics: callers should not fail if storage fails.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// struct InMemoryStore {
35/// observations: Mutex<Vec<UsageObservation>>,
36/// }
37///
38/// impl ObservationStore for InMemoryStore {
39/// fn append_observation(&self, obs: UsageObservation) -> Result<StoreResult, StoreError> {
40/// self.observations.lock().unwrap().push(obs);
41/// Ok(StoreResult::Appended)
42/// }
43/// }
44/// ```
45pub trait ObservationStore: Send + Sync {
46 /// Append a single observation to the store.
47 ///
48 /// # Errors
49 ///
50 /// Returns `StoreError` if the observation cannot be persisted.
51 /// Callers should implement their own retry logic at the application layer.
52 fn append_observation(&self, observation: UsageObservation) -> Result<StoreResult, StoreError>;
53}
54
55/// Trait for appending rated usage records.
56///
57/// Rated records are the output of the rating engine.
58/// They can be stored separately from (or in addition to) raw observations.
59///
60/// # When to use which pattern
61///
62/// | Pattern | Write | Use case |
63/// |---------|-------|----------|
64/// | Observations only | `append_observation` | Offline rating, facts never lost |
65/// | Rated records only | `append_rated_record` | Inline rating, no raw data needed |
66/// | Both | Both | Audit + replay (recommended for xrouter) |
67pub trait RatedRecordStore: Send + Sync {
68 /// Append a single rated record to the store.
69 fn append_rated_record(&self, record: RatedUsageRecord) -> Result<StoreResult, StoreError>;
70}
71
72/// Result of a store operation (for idempotency).
73///
74/// Same `UsageEventId` repeated append should return `AlreadyExists`
75/// (side effects happen only once).
76#[derive(Debug, Clone, PartialEq)]
77pub enum StoreResult {
78 /// First-time write.
79 Appended,
80 /// Idempotent duplicate: already exists, not re-written.
81 AlreadyExists,
82}
83
84/// Error type for storage operations.
85#[derive(Debug, Clone)]
86pub enum StoreError {
87 /// Connection/network error (retryable).
88 ConnectionError(String),
89
90 /// Data integrity error (not retryable).
91 IntegrityError(String),
92
93 /// Serialization/deserialization error.
94 SerializationError(String),
95
96 /// Generic error.
97 Other(String),
98}
99
100// Implement a simple display for StoreError
101impl std::fmt::Display for StoreError {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 match self {
104 StoreError::ConnectionError(e) => write!(f, "Connection error: {e}"),
105 StoreError::IntegrityError(e) => write!(f, "Integrity error: {e}"),
106 StoreError::SerializationError(e) => write!(f, "Serialization error: {e}"),
107 StoreError::Other(e) => write!(f, "Store error: {e}"),
108 }
109 }
110}
111
112impl std::error::Error for StoreError {}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use crate::identity::UsageEventId;
118 use crate::observation::{Attributes, MeterSet, UsageObservation, UsageOutcome, UsageSource, UsageTiming};
119 use crate::pricing::ModelRef;
120 use chrono::Utc;
121 use std::sync::{Arc, Mutex};
122
123 /// Simple in-memory store for testing.
124 struct InMemoryObservationStore {
125 observations: Arc<Mutex<Vec<UsageObservation>>>,
126 }
127
128 impl InMemoryObservationStore {
129 fn new() -> Self {
130 Self {
131 observations: Arc::new(Mutex::new(Vec::new())),
132 }
133 }
134 }
135
136 impl ObservationStore for InMemoryObservationStore {
137 fn append_observation(&self, observation: UsageObservation) -> Result<StoreResult, StoreError> {
138 self.observations.lock().unwrap().push(observation);
139 Ok(StoreResult::Appended)
140 }
141 }
142
143 #[test]
144 fn in_memory_store_appends() {
145 let store = InMemoryObservationStore::new();
146 let obs = UsageObservation {
147 event_id: UsageEventId::from_raw("test-1"),
148 subject: crate::identity::BillingSubject::default(),
149 meter_set: MeterSet::new(),
150 model_ref: ModelRef {
151 billable_model: "test".to_string(),
152 vendor: None,
153 region: None,
154 tier: None,
155 },
156 provider_ref: None,
157 source: UsageSource::Estimated,
158 outcome: UsageOutcome::Success,
159 timing: UsageTiming {
160 observed_at: Utc::now(),
161 completed_at: None,
162 },
163 correlation: crate::identity::CorrelationIds::default(),
164 attributes: Attributes::new(),
165 };
166
167 let result = store.append_observation(obs).unwrap();
168 assert_eq!(result, StoreResult::Appended);
169 assert_eq!(store.observations.lock().unwrap().len(), 1);
170 }
171
172 #[test]
173 fn store_error_display() {
174 let err = StoreError::ConnectionError("db down".to_string());
175 assert_eq!(err.to_string(), "Connection error: db down");
176 }
177
178 #[test]
179 fn store_result_equality() {
180 assert_eq!(StoreResult::Appended, StoreResult::Appended);
181 assert_eq!(StoreResult::AlreadyExists, StoreResult::AlreadyExists);
182 assert_ne!(StoreResult::Appended, StoreResult::AlreadyExists);
183 }
184}