switchgear_components/offer/
memory.rs1use crate::offer::error::OfferStoreError;
2use async_trait::async_trait;
3use std::collections::HashMap;
4use std::sync::Arc;
5use switchgear_service_api::offer::{OfferMetadata, OfferMetadataStore, OfferRecord, OfferStore};
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9#[derive(Clone, Debug)]
10struct OfferRecordTimestamped {
11 created: chrono::DateTime<chrono::Utc>,
12 offer: OfferRecord,
13}
14
15#[derive(Clone, Debug)]
16struct OfferMetadataTimestamped {
17 created: chrono::DateTime<chrono::Utc>,
18 metadata: OfferMetadata,
19}
20
21#[derive(Clone, Debug)]
22pub struct MemoryOfferStore {
23 offer: Arc<Mutex<HashMap<(String, Uuid), OfferRecordTimestamped>>>,
24 metadata: Arc<Mutex<HashMap<(String, Uuid), OfferMetadataTimestamped>>>,
25}
26
27impl MemoryOfferStore {
28 pub fn new() -> Self {
29 Self {
30 offer: Arc::new(Mutex::new(HashMap::new())),
31 metadata: Arc::new(Mutex::new(HashMap::new())),
32 }
33 }
34}
35
36impl Default for MemoryOfferStore {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42#[async_trait]
43impl OfferStore for MemoryOfferStore {
44 type Error = OfferStoreError;
45
46 async fn get_offer(
47 &self,
48 partition: &str,
49 id: &Uuid,
50 sparse: Option<bool>,
51 ) -> Result<Option<OfferRecord>, Self::Error> {
52 let sparse = sparse.unwrap_or(true);
53 let metadata_store = self.metadata.lock().await;
54 let store = self.offer.lock().await;
55
56 Ok(store.get(&(partition.to_string(), *id)).and_then(|offer| {
57 if sparse {
58 Some(offer.offer.clone())
59 } else {
60 metadata_store
61 .get(&(partition.to_string(), offer.offer.offer.metadata_id))
62 .map(|metadata| {
63 let mut offer = offer.offer.clone();
64 offer.offer.metadata = Some(metadata.metadata.metadata.clone());
65 offer
66 })
67 }
68 }))
69 }
70
71 async fn get_offers(
72 &self,
73 partition: &str,
74 start: usize,
75 count: usize,
76 ) -> Result<Vec<OfferRecord>, Self::Error> {
77 let store = self.offer.lock().await;
78 let mut offers: Vec<OfferRecordTimestamped> = store
79 .iter()
80 .filter(|((p, _), _)| p == partition)
81 .map(|(_, offer)| offer.clone())
82 .collect();
83
84 offers.sort_by(|a, b| {
85 a.created
86 .cmp(&b.created)
87 .then_with(|| a.offer.id.cmp(&b.offer.id))
88 });
89
90 let offers = offers
91 .into_iter()
92 .skip(start)
93 .take(count)
94 .map(|o| o.offer)
95 .collect();
96
97 Ok(offers)
98 }
99
100 async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
101 let metadata_store = self.metadata.lock().await;
102 let mut store = self.offer.lock().await;
103
104 if !metadata_store.contains_key(&(offer.partition.to_string(), offer.offer.metadata_id)) {
105 return Err(OfferStoreError::invalid_input_error(
106 format!("post offer {offer:?}"),
107 format!(
108 "metadata {} not found for offer {}",
109 offer.offer.metadata_id, offer.id
110 ),
111 ));
112 }
113
114 if let std::collections::hash_map::Entry::Vacant(e) =
115 store.entry((offer.partition.to_string(), offer.id))
116 {
117 e.insert(OfferRecordTimestamped {
118 created: chrono::Utc::now(),
119 offer: offer.clone(),
120 });
121 Ok(Some(offer.id))
122 } else {
123 Ok(None)
124 }
125 }
126
127 async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
128 let metadata_store = self.metadata.lock().await;
129 let mut store = self.offer.lock().await;
130
131 if !metadata_store.contains_key(&(offer.partition.to_string(), offer.offer.metadata_id)) {
132 return Err(OfferStoreError::invalid_input_error(
133 format!("put offer {offer:?}"),
134 format!(
135 "metadata {} not found for offer {}",
136 offer.offer.metadata_id, offer.id
137 ),
138 ));
139 }
140
141 let was_new = store
142 .insert(
143 (offer.partition.to_string(), offer.id),
144 OfferRecordTimestamped {
145 created: chrono::Utc::now(),
146 offer,
147 },
148 )
149 .is_none();
150 Ok(was_new)
151 }
152
153 async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
154 let mut store = self.offer.lock().await;
155 Ok(store.remove(&(partition.to_string(), *id)).is_some())
156 }
157}
158
159#[async_trait]
160impl OfferMetadataStore for MemoryOfferStore {
161 type Error = OfferStoreError;
162
163 async fn get_metadata(
164 &self,
165 partition: &str,
166 id: &Uuid,
167 ) -> Result<Option<OfferMetadata>, Self::Error> {
168 let store = self.metadata.lock().await;
169 Ok(store
170 .get(&(partition.to_string(), *id))
171 .map(|o| o.metadata.clone()))
172 }
173
174 async fn get_all_metadata(
175 &self,
176 partition: &str,
177 start: usize,
178 count: usize,
179 ) -> Result<Vec<OfferMetadata>, Self::Error> {
180 let store = self.metadata.lock().await;
181 let mut metadata: Vec<OfferMetadataTimestamped> = store
182 .iter()
183 .filter(|((p, _), _)| p == partition)
184 .map(|(_, metadata)| metadata.clone())
185 .collect();
186
187 metadata.sort_by(|a, b| {
188 a.created
189 .cmp(&b.created)
190 .then_with(|| a.metadata.id.cmp(&b.metadata.id))
191 });
192
193 let metadata = metadata
194 .into_iter()
195 .skip(start)
196 .take(count)
197 .map(|o| o.metadata)
198 .collect();
199
200 Ok(metadata)
201 }
202
203 async fn post_metadata(&self, metadata: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
204 let mut store = self.metadata.lock().await;
205 if let std::collections::hash_map::Entry::Vacant(e) =
206 store.entry((metadata.partition.to_string(), metadata.id))
207 {
208 e.insert(OfferMetadataTimestamped {
209 created: chrono::Utc::now(),
210 metadata: metadata.clone(),
211 });
212
213 Ok(Some(metadata.id))
214 } else {
215 Ok(None)
216 }
217 }
218
219 async fn put_metadata(&self, metadata: OfferMetadata) -> Result<bool, Self::Error> {
220 let mut store = self.metadata.lock().await;
221 let was_new = store
222 .insert(
223 (metadata.partition.to_string(), metadata.id),
224 OfferMetadataTimestamped {
225 created: chrono::Utc::now(),
226 metadata,
227 },
228 )
229 .is_none();
230 Ok(was_new)
231 }
232
233 async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
234 let offer_store = self.offer.lock().await;
235 let mut metadata_store = self.metadata.lock().await;
236
237 let metadata_in_use = offer_store.values().any(|offer| {
238 offer.offer.partition == partition && offer.offer.offer.metadata_id == *id
239 });
240
241 if metadata_in_use {
242 return Err(OfferStoreError::invalid_input_error(
243 format!("delete metadata {partition}/{id}"),
244 format!("metadata {} is referenced by existing offers", id),
245 ));
246 }
247
248 Ok(metadata_store
249 .remove(&(partition.to_string(), *id))
250 .is_some())
251 }
252}