Skip to main content

latch_billing/
storage.rs

1//! Storage module - defines traits for persisting observations and rated records.
2//!
3//! **Design note**: Both `ObservationStore` and `RatedRecordStore` are
4//! **sync** traits. `latch-billing` does not expose async traits.
5//!
6//! For async storage (e.g., Postgres via sqlx), downstream consumers should
7//! implement their own async buffer layer using these sync traits as the foundation.
8
9use crate::observation::UsageObservation;
10use crate::rating::RatedUsageRecord;
11
12/// Trait for appending raw usage observations.
13///
14/// Observations are **immutable facts** - they should never be updated
15/// or deleted, only appended. This is the "append-only event stream"
16/// pattern.
17///
18/// # Idempotency
19///
20/// Same `UsageEventId` repeated append should:
21/// - Be treated as success (not an error)
22/// - Side effects happen only once (no double counting)
23/// - Return `StoreResult::AlreadyExists` for callers to observe
24///
25/// # Design
26///
27/// - Sync trait: implementors should use in-memory storage or
28///   implement their own async buffer layer in the application.
29/// - Fail-open semantics: callers should not fail if storage fails.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// struct InMemoryStore {
35///     observations: Mutex<Vec<UsageObservation>>,
36/// }
37///
38/// impl ObservationStore for InMemoryStore {
39///     fn append_observation(&self, obs: UsageObservation) -> Result<StoreResult, StoreError> {
40///         self.observations.lock().unwrap().push(obs);
41///         Ok(StoreResult::Appended)
42///     }
43/// }
44/// ```
45pub trait ObservationStore: Send + Sync {
46    /// Append a single observation to the store.
47    ///
48    /// # Errors
49    ///
50    /// Returns `StoreError` if the observation cannot be persisted.
51    /// Callers should implement their own retry logic at the application layer.
52    fn append_observation(&self, observation: UsageObservation) -> Result<StoreResult, StoreError>;
53}
54
55/// Trait for appending rated usage records.
56///
57/// Rated records are the output of the rating engine.
58/// They can be stored separately from (or in addition to) raw observations.
59///
60/// # When to use which pattern
61///
62/// | Pattern | Write | Use case |
63/// |---------|-------|----------|
64/// | Observations only | `append_observation` | Offline rating, facts never lost |
65/// | Rated records only | `append_rated_record` | Inline rating, no raw data needed |
66/// | Both | Both | Audit + replay (recommended for xrouter) |
67pub trait RatedRecordStore: Send + Sync {
68    /// Append a single rated record to the store.
69    fn append_rated_record(&self, record: RatedUsageRecord) -> Result<StoreResult, StoreError>;
70}
71
72/// Result of a store operation (for idempotency).
73///
74/// Same `UsageEventId` repeated append should return `AlreadyExists`
75/// (side effects happen only once).
76#[derive(Debug, Clone, PartialEq)]
77pub enum StoreResult {
78    /// First-time write.
79    Appended,
80    /// Idempotent duplicate: already exists, not re-written.
81    AlreadyExists,
82}
83
84/// Error type for storage operations.
85#[derive(Debug, Clone)]
86pub enum StoreError {
87    /// Connection/network error (retryable).
88    ConnectionError(String),
89
90    /// Data integrity error (not retryable).
91    IntegrityError(String),
92
93    /// Serialization/deserialization error.
94    SerializationError(String),
95
96    /// Generic error.
97    Other(String),
98}
99
100// Implement a simple display for StoreError
101impl std::fmt::Display for StoreError {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        match self {
104            StoreError::ConnectionError(e) => write!(f, "Connection error: {e}"),
105            StoreError::IntegrityError(e) => write!(f, "Integrity error: {e}"),
106            StoreError::SerializationError(e) => write!(f, "Serialization error: {e}"),
107            StoreError::Other(e) => write!(f, "Store error: {e}"),
108        }
109    }
110}
111
112impl std::error::Error for StoreError {}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use crate::identity::UsageEventId;
118    use crate::observation::{Attributes, MeterSet, UsageObservation, UsageOutcome, UsageSource, UsageTiming};
119    use crate::pricing::ModelRef;
120    use chrono::Utc;
121    use std::sync::{Arc, Mutex};
122
123    /// Simple in-memory store for testing.
124    struct InMemoryObservationStore {
125        observations: Arc<Mutex<Vec<UsageObservation>>>,
126    }
127
128    impl InMemoryObservationStore {
129        fn new() -> Self {
130            Self {
131                observations: Arc::new(Mutex::new(Vec::new())),
132            }
133        }
134    }
135
136    impl ObservationStore for InMemoryObservationStore {
137        fn append_observation(&self, observation: UsageObservation) -> Result<StoreResult, StoreError> {
138            self.observations.lock().unwrap().push(observation);
139            Ok(StoreResult::Appended)
140        }
141    }
142
143    #[test]
144    fn in_memory_store_appends() {
145        let store = InMemoryObservationStore::new();
146        let obs = UsageObservation {
147            event_id: UsageEventId::from_raw("test-1"),
148            subject: crate::identity::BillingSubject::default(),
149            meter_set: MeterSet::new(),
150            model_ref: ModelRef {
151                billable_model: "test".to_string(),
152                vendor: None,
153                region: None,
154                tier: None,
155            },
156            provider_ref: None,
157            source: UsageSource::Estimated,
158            outcome: UsageOutcome::Success,
159            timing: UsageTiming {
160                observed_at: Utc::now(),
161                completed_at: None,
162            },
163            correlation: crate::identity::CorrelationIds::default(),
164            attributes: Attributes::new(),
165        };
166
167        let result = store.append_observation(obs).unwrap();
168        assert_eq!(result, StoreResult::Appended);
169        assert_eq!(store.observations.lock().unwrap().len(), 1);
170    }
171
172    #[test]
173    fn store_error_display() {
174        let err = StoreError::ConnectionError("db down".to_string());
175        assert_eq!(err.to_string(), "Connection error: db down");
176    }
177
178    #[test]
179    fn store_result_equality() {
180        assert_eq!(StoreResult::Appended, StoreResult::Appended);
181        assert_eq!(StoreResult::AlreadyExists, StoreResult::AlreadyExists);
182        assert_ne!(StoreResult::Appended, StoreResult::AlreadyExists);
183    }
184}