1use crate::discovery::db::Column;
2use crate::offer::db_orm::prelude::*;
3use crate::offer::db_orm::{offer_metadata_table, offer_record_table};
4use crate::offer::error::OfferStoreError;
5use async_trait::async_trait;
6use chrono::Utc;
7use sea_orm::{
8 ColumnTrait, Database, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
9 Set,
10};
11use switchgear_migration::OnConflict;
12use switchgear_migration::{Expr, MigratorTrait};
13use switchgear_service_api::offer::{
14 OfferMetadata, OfferMetadataStore, OfferRecord, OfferRecordSparse, OfferStore,
15};
16use switchgear_service_api::service::ServiceErrorSource;
17use uuid::Uuid;
18
19#[derive(Clone, Debug)]
20pub struct DbOfferStore {
21 db: DatabaseConnection,
22}
23
24impl DbOfferStore {
25 pub async fn connect(uri: &str, max_connections: u32) -> Result<Self, OfferStoreError> {
26 let mut opt = sea_orm::ConnectOptions::new(uri);
27 opt.max_connections(max_connections);
28 let db = Database::connect(opt).await.map_err(|e| {
29 OfferStoreError::from_db(
30 ServiceErrorSource::Internal,
31 "connecting to offer database",
32 e,
33 )
34 })?;
35
36 Ok(Self::from_db(db))
37 }
38
39 pub async fn migrate_up(&self) -> Result<(), OfferStoreError> {
40 switchgear_migration::OfferMigrator::up(&self.db, None)
41 .await
42 .map_err(|e| {
43 OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database up", e)
44 })?;
45 Ok(())
46 }
47
48 pub async fn migrate_down(&self) -> Result<(), OfferStoreError> {
49 switchgear_migration::OfferMigrator::down(&self.db, None)
50 .await
51 .map_err(|e| {
52 OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database down", e)
53 })?;
54 Ok(())
55 }
56
57 pub fn from_db(db: DatabaseConnection) -> Self {
58 Self { db }
59 }
60}
61
62#[async_trait]
63impl OfferStore for DbOfferStore {
64 type Error = OfferStoreError;
65
66 async fn get_offer(
67 &self,
68 partition: &str,
69 id: &Uuid,
70 sparse: Option<bool>,
71 ) -> Result<Option<OfferRecord>, Self::Error> {
72 let sparse = sparse.unwrap_or(true);
73
74 let result = OfferRecordTable::find_by_id((partition.to_string(), *id))
75 .find_also_related(OfferMetadataTable)
76 .one(&self.db)
77 .await
78 .map_err(|e| {
79 OfferStoreError::from_db(
80 ServiceErrorSource::Internal,
81 format!("getting offer with metadata for partition {partition} id {id}",),
82 e,
83 )
84 })?;
85
86 let (offer_model, metadata_model) = match (result, sparse) {
87 (Some((offer, Some(metadata))), false) => {
88 let metadata = serde_json::from_value(metadata.metadata).map_err(|e| {
89 OfferStoreError::serialization_error(
90 ServiceErrorSource::Internal,
91 format!("deserializing metadata for partition {partition} id {id}",),
92 e,
93 )
94 })?;
95 (offer, Some(metadata))
96 }
97 (Some((offer, _)), true) => (offer, None),
98 _ => return Ok(None),
99 };
100
101 Ok(Some(OfferRecord {
102 partition: offer_model.partition,
103 id: offer_model.id,
104 offer: OfferRecordSparse {
105 max_sendable: offer_model.max_sendable as u64,
106 min_sendable: offer_model.min_sendable as u64,
107 metadata_id: offer_model.metadata_id,
108 metadata: metadata_model,
109 timestamp: offer_model.timestamp.into(),
110 expires: offer_model.expires.map(|dt| dt.into()),
111 },
112 }))
113 }
114
115 async fn get_offers(
116 &self,
117 partition: &str,
118 start: usize,
119 count: usize,
120 ) -> Result<Vec<OfferRecord>, Self::Error> {
121 let models = OfferRecordTable::find()
122 .filter(offer_record_table::Column::Partition.eq(partition))
123 .order_by_asc(offer_record_table::Column::CreatedAt)
124 .order_by_asc(offer_record_table::Column::Id)
125 .offset(start as u64)
126 .limit(count as u64)
127 .all(&self.db)
128 .await
129 .map_err(|e| {
130 OfferStoreError::from_db(
131 ServiceErrorSource::Internal,
132 format!("getting offers for partition {partition}"),
133 e,
134 )
135 })?;
136
137 let mut offers = Vec::new();
138 for model in models {
139 offers.push(OfferRecord {
140 partition: model.partition,
141 id: model.id,
142 offer: OfferRecordSparse {
143 max_sendable: model.max_sendable as u64,
144 min_sendable: model.min_sendable as u64,
145 metadata_id: model.metadata_id,
146 metadata: None,
147 timestamp: model.timestamp.into(),
148 expires: model.expires.map(|dt| dt.into()),
149 },
150 });
151 }
152
153 Ok(offers)
154 }
155
156 async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
157 let now = Utc::now();
158 let active_model = offer_record_table::ActiveModel {
159 id: Set(offer.id),
160 partition: Set(offer.partition.clone()),
161 max_sendable: Set(offer.offer.max_sendable as i64),
162 min_sendable: Set(offer.offer.min_sendable as i64),
163 metadata_id: Set(offer.offer.metadata_id),
164 timestamp: Set(offer.offer.timestamp.into()),
165 expires: Set(offer.offer.expires.map(|dt| dt.into())),
166 created_at: Set(now.into()),
167 updated_at: Set(now.into()),
168 };
169
170 match OfferRecordTable::insert(active_model).exec(&self.db).await {
171 Ok(_) => Ok(Some(offer.id)),
172 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
174 db_err,
175 )))) if db_err.is_unique_violation() => Ok(None),
176 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
178 db_err,
179 )))) if db_err.is_unique_violation() => Ok(None),
180 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
182 db_err,
183 )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
184 format!("post offer {offer:?}"),
185 format!(
186 "metadata {} not found for offer {}",
187 offer.offer.metadata_id, offer.id
188 ),
189 )),
190 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
191 db_err,
192 )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
193 format!("post offer {offer:?}"),
194 format!(
195 "metadata {} not found for offer {}",
196 offer.offer.metadata_id, offer.id
197 ),
198 )),
199 Err(e) => Err(OfferStoreError::from_db(
200 ServiceErrorSource::Internal,
201 format!(
202 "inserting offer for partition {} id {}",
203 offer.partition, offer.id
204 ),
205 e,
206 )),
207 }
208 }
209
210 async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
211 let now = Utc::now();
212 let future_timestamp = now + chrono::Duration::seconds(1);
213
214 let active_model = offer_record_table::ActiveModel {
215 id: Set(offer.id),
216 partition: Set(offer.partition.clone()),
217 max_sendable: Set(offer.offer.max_sendable as i64),
218 min_sendable: Set(offer.offer.min_sendable as i64),
219 metadata_id: Set(offer.offer.metadata_id),
220 timestamp: Set(offer.offer.timestamp.into()),
221 expires: Set(offer.offer.expires.map(|dt| dt.into())),
222 created_at: Set(now.into()), updated_at: Set(now.into()),
224 };
225
226 let _result = match OfferRecordTable::insert(active_model)
227 .on_conflict(
228 OnConflict::columns([
229 offer_record_table::Column::Partition,
230 offer_record_table::Column::Id,
231 ])
232 .update_columns([
233 offer_record_table::Column::MaxSendable,
234 offer_record_table::Column::MinSendable,
235 offer_record_table::Column::MetadataId,
236 offer_record_table::Column::Timestamp,
237 offer_record_table::Column::Expires,
238 ])
239 .value(Column::UpdatedAt, Expr::val(future_timestamp))
240 .to_owned(),
241 )
242 .exec(&self.db)
243 .await
244 {
245 Ok(result) => result,
246 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
248 db_err,
249 )))) if db_err.is_foreign_key_violation() => {
250 return Err(OfferStoreError::invalid_input_error(
251 format!("put offer {offer:?}"),
252 format!(
253 "metadata {} not found for offer {}",
254 offer.offer.metadata_id, offer.id
255 ),
256 ));
257 }
258 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
259 db_err,
260 )))) if db_err.is_foreign_key_violation() => {
261 return Err(OfferStoreError::invalid_input_error(
262 format!("put offer {offer:?}"),
263 format!(
264 "metadata {} not found for offer {}",
265 offer.offer.metadata_id, offer.id
266 ),
267 ));
268 }
269 Err(e) => {
270 return Err(OfferStoreError::from_db(
271 ServiceErrorSource::Internal,
272 format!(
273 "upserting offer for partition {} id {}",
274 offer.partition, offer.id
275 ),
276 e,
277 ));
278 }
279 };
280
281 let result = OfferRecordTable::find()
283 .filter(offer_record_table::Column::Partition.eq(offer.partition.clone()))
284 .filter(offer_record_table::Column::Id.eq(offer.id))
285 .select_only()
286 .column(offer_record_table::Column::CreatedAt)
287 .column(offer_record_table::Column::UpdatedAt)
288 .into_tuple::<(
289 chrono::DateTime<chrono::FixedOffset>,
290 chrono::DateTime<chrono::FixedOffset>,
291 )>()
292 .one(&self.db)
293 .await
294 .map_err(|e| {
295 OfferStoreError::from_db(
296 ServiceErrorSource::Internal,
297 format!(
298 "fetching offer after upsert for partition {} id {}",
299 offer.partition, offer.id
300 ),
301 e,
302 )
303 })?
304 .ok_or_else(|| {
305 OfferStoreError::from_db(
306 ServiceErrorSource::Internal,
307 "upsert succeeded but record not found",
308 sea_orm::DbErr::RecordNotFound(
309 "Record should exist after successful upsert".to_string(),
310 ),
311 )
312 })?;
313
314 Ok(result.0 == result.1)
316 }
317
318 async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
319 let result = OfferRecordTable::delete_by_id((partition.to_string(), *id))
320 .exec(&self.db)
321 .await
322 .map_err(|e| {
323 OfferStoreError::from_db(
324 ServiceErrorSource::Internal,
325 format!("deleting offer for partition {partition} id {id}"),
326 e,
327 )
328 })?;
329
330 Ok(result.rows_affected > 0)
331 }
332}
333
334#[async_trait]
335impl OfferMetadataStore for DbOfferStore {
336 type Error = OfferStoreError;
337
338 async fn get_metadata(
339 &self,
340 partition: &str,
341 id: &Uuid,
342 ) -> Result<Option<OfferMetadata>, Self::Error> {
343 let model = OfferMetadataTable::find_by_id((partition.to_string(), *id))
344 .one(&self.db)
345 .await
346 .map_err(|e| {
347 OfferStoreError::from_db(
348 ServiceErrorSource::Internal,
349 format!("getting metadata for partition {partition} id {id}"),
350 e,
351 )
352 })?;
353
354 match model {
355 Some(model) => {
356 let metadata = serde_json::from_value(model.metadata).map_err(|e| {
357 OfferStoreError::serialization_error(
358 ServiceErrorSource::Internal,
359 format!("deserializing metadata for partition {partition} id {id}",),
360 e,
361 )
362 })?;
363
364 Ok(Some(OfferMetadata {
365 id: model.id,
366 partition: model.partition,
367 metadata,
368 }))
369 }
370 None => Ok(None),
371 }
372 }
373
374 async fn get_all_metadata(
375 &self,
376 partition: &str,
377 start: usize,
378 count: usize,
379 ) -> Result<Vec<OfferMetadata>, Self::Error> {
380 let models = OfferMetadataTable::find()
381 .filter(offer_metadata_table::Column::Partition.eq(partition))
382 .order_by_asc(offer_metadata_table::Column::CreatedAt)
383 .order_by_asc(offer_metadata_table::Column::Id)
384 .offset(start as u64)
385 .limit(count as u64)
386 .all(&self.db)
387 .await
388 .map_err(|e| {
389 OfferStoreError::from_db(
390 ServiceErrorSource::Internal,
391 format!("getting all metadata for partition {partition}"),
392 e,
393 )
394 })?;
395
396 let mut metadata_list = Vec::new();
397 for model in models {
398 let metadata = serde_json::from_value(model.metadata).map_err(|e| {
399 OfferStoreError::serialization_error(
400 ServiceErrorSource::Internal,
401 format!(
402 "deserializing metadata for partition {} id {}",
403 partition, model.id
404 ),
405 e,
406 )
407 })?;
408
409 metadata_list.push(OfferMetadata {
410 id: model.id,
411 partition: model.partition,
412 metadata,
413 });
414 }
415
416 Ok(metadata_list)
417 }
418
419 async fn post_metadata(&self, offer: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
420 let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
421 OfferStoreError::serialization_error(
422 ServiceErrorSource::Internal,
423 format!(
424 "serializing metadata for partition {} id {}",
425 offer.partition, offer.id
426 ),
427 e,
428 )
429 })?;
430
431 let now = Utc::now();
432 let active_model = offer_metadata_table::ActiveModel {
433 id: Set(offer.id),
434 partition: Set(offer.partition.clone()),
435 metadata: Set(metadata_json),
436 created_at: Set(now.into()),
437 updated_at: Set(now.into()),
438 };
439
440 match OfferMetadataTable::insert(active_model)
441 .exec(&self.db)
442 .await
443 {
444 Ok(_) => Ok(Some(offer.id)),
445 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
447 db_err,
448 )))) if db_err.is_unique_violation() => Ok(None),
449 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
451 db_err,
452 )))) if db_err.is_unique_violation() => Ok(None),
453 Err(e) => Err(OfferStoreError::from_db(
454 ServiceErrorSource::Internal,
455 format!(
456 "inserting metadata for partition {} id {}",
457 offer.partition, offer.id
458 ),
459 e,
460 )),
461 }
462 }
463
464 async fn put_metadata(&self, offer: OfferMetadata) -> Result<bool, Self::Error> {
465 let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
466 OfferStoreError::serialization_error(
467 ServiceErrorSource::Internal,
468 format!(
469 "serializing metadata for partition {} id {}",
470 offer.partition, offer.id
471 ),
472 e,
473 )
474 })?;
475
476 let now = Utc::now();
477 let future_timestamp = now + chrono::Duration::seconds(1);
478
479 let active_model = offer_metadata_table::ActiveModel {
480 id: Set(offer.id),
481 partition: Set(offer.partition.clone()),
482 metadata: Set(metadata_json),
483 created_at: Set(now.into()), updated_at: Set(now.into()),
485 };
486
487 let _result = OfferMetadataTable::insert(active_model)
488 .on_conflict(
489 OnConflict::columns([
490 offer_metadata_table::Column::Partition,
491 offer_metadata_table::Column::Id,
492 ])
493 .update_columns([offer_metadata_table::Column::Metadata])
494 .value(Column::UpdatedAt, Expr::val(future_timestamp))
495 .to_owned(),
496 )
497 .exec(&self.db)
498 .await
499 .map_err(|e| {
500 OfferStoreError::from_db(
501 ServiceErrorSource::Internal,
502 format!(
503 "upserting metadata for partition {} id {}",
504 offer.partition, offer.id
505 ),
506 e,
507 )
508 })?;
509
510 let result = OfferMetadataTable::find()
512 .filter(offer_metadata_table::Column::Partition.eq(offer.partition.clone()))
513 .filter(offer_metadata_table::Column::Id.eq(offer.id))
514 .select_only()
515 .column(offer_metadata_table::Column::CreatedAt)
516 .column(offer_metadata_table::Column::UpdatedAt)
517 .into_tuple::<(
518 chrono::DateTime<chrono::FixedOffset>,
519 chrono::DateTime<chrono::FixedOffset>,
520 )>()
521 .one(&self.db)
522 .await
523 .map_err(|e| {
524 OfferStoreError::from_db(
525 ServiceErrorSource::Internal,
526 format!(
527 "fetching metadata after upsert for partition {} id {}",
528 offer.partition, offer.id
529 ),
530 e,
531 )
532 })?
533 .ok_or_else(|| {
534 OfferStoreError::from_db(
535 ServiceErrorSource::Internal,
536 "upsert succeeded but record not found",
537 sea_orm::DbErr::RecordNotFound(
538 "Record should exist after successful upsert".to_string(),
539 ),
540 )
541 })?;
542
543 Ok(result.0 == result.1)
545 }
546
547 async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
548 let result = OfferMetadataTable::delete_by_id((partition.to_string(), *id))
549 .exec(&self.db)
550 .await
551 .map_err(|e| match e {
552 sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
553 db_err,
554 ))) if db_err.is_foreign_key_violation()
555 || db_err.code().as_deref() == Some("1811") =>
557 {
558 OfferStoreError::invalid_input_error(
559 format!("deleting metadata for partition {partition} id {id}"),
560 format!("metadata {} is referenced by existing offers", id),
561 )
562 }
563 _ => OfferStoreError::from_db(
564 ServiceErrorSource::Internal,
565 format!("deleting metadata for partition {partition} id {id}"),
566 e,
567 ),
568 })?;
569
570 Ok(result.rows_affected > 0)
571 }
572}