switchgear_components/offer/
db.rs

1use crate::discovery::db::Column;
2use crate::offer::db_orm::prelude::*;
3use crate::offer::db_orm::{offer_metadata_table, offer_record_table};
4use crate::offer::error::OfferStoreError;
5use async_trait::async_trait;
6use chrono::Utc;
7use sea_orm::{
8    ColumnTrait, Database, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
9    Set,
10};
11use switchgear_migration::OnConflict;
12use switchgear_migration::{Expr, MigratorTrait};
13use switchgear_service_api::offer::{
14    OfferMetadata, OfferMetadataStore, OfferRecord, OfferRecordSparse, OfferStore,
15};
16use switchgear_service_api::service::ServiceErrorSource;
17use uuid::Uuid;
18
19#[derive(Clone, Debug)]
20pub struct DbOfferStore {
21    db: DatabaseConnection,
22}
23
24impl DbOfferStore {
25    pub async fn connect(uri: &str, max_connections: u32) -> Result<Self, OfferStoreError> {
26        let mut opt = sea_orm::ConnectOptions::new(uri);
27        opt.max_connections(max_connections);
28        let db = Database::connect(opt).await.map_err(|e| {
29            OfferStoreError::from_db(
30                ServiceErrorSource::Internal,
31                "connecting to offer database",
32                e,
33            )
34        })?;
35
36        Ok(Self::from_db(db))
37    }
38
39    pub async fn migrate_up(&self) -> Result<(), OfferStoreError> {
40        switchgear_migration::OfferMigrator::up(&self.db, None)
41            .await
42            .map_err(|e| {
43                OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database up", e)
44            })?;
45        Ok(())
46    }
47
48    pub async fn migrate_down(&self) -> Result<(), OfferStoreError> {
49        switchgear_migration::OfferMigrator::down(&self.db, None)
50            .await
51            .map_err(|e| {
52                OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database down", e)
53            })?;
54        Ok(())
55    }
56
57    pub fn from_db(db: DatabaseConnection) -> Self {
58        Self { db }
59    }
60}
61
62#[async_trait]
63impl OfferStore for DbOfferStore {
64    type Error = OfferStoreError;
65
66    async fn get_offer(
67        &self,
68        partition: &str,
69        id: &Uuid,
70        sparse: Option<bool>,
71    ) -> Result<Option<OfferRecord>, Self::Error> {
72        let sparse = sparse.unwrap_or(true);
73
74        let result = OfferRecordTable::find_by_id((partition.to_string(), *id))
75            .find_also_related(OfferMetadataTable)
76            .one(&self.db)
77            .await
78            .map_err(|e| {
79                OfferStoreError::from_db(
80                    ServiceErrorSource::Internal,
81                    format!("getting offer with metadata for partition {partition} id {id}",),
82                    e,
83                )
84            })?;
85
86        let (offer_model, metadata_model) = match (result, sparse) {
87            (Some((offer, Some(metadata))), false) => {
88                let metadata = serde_json::from_value(metadata.metadata).map_err(|e| {
89                    OfferStoreError::serialization_error(
90                        ServiceErrorSource::Internal,
91                        format!("deserializing metadata for partition {partition} id {id}",),
92                        e,
93                    )
94                })?;
95                (offer, Some(metadata))
96            }
97            (Some((offer, _)), true) => (offer, None),
98            _ => return Ok(None),
99        };
100
101        Ok(Some(OfferRecord {
102            partition: offer_model.partition,
103            id: offer_model.id,
104            offer: OfferRecordSparse {
105                max_sendable: offer_model.max_sendable as u64,
106                min_sendable: offer_model.min_sendable as u64,
107                metadata_id: offer_model.metadata_id,
108                metadata: metadata_model,
109                timestamp: offer_model.timestamp.into(),
110                expires: offer_model.expires.map(|dt| dt.into()),
111            },
112        }))
113    }
114
115    async fn get_offers(
116        &self,
117        partition: &str,
118        start: usize,
119        count: usize,
120    ) -> Result<Vec<OfferRecord>, Self::Error> {
121        let models = OfferRecordTable::find()
122            .filter(offer_record_table::Column::Partition.eq(partition))
123            .order_by_asc(offer_record_table::Column::CreatedAt)
124            .order_by_asc(offer_record_table::Column::Id)
125            .offset(start as u64)
126            .limit(count as u64)
127            .all(&self.db)
128            .await
129            .map_err(|e| {
130                OfferStoreError::from_db(
131                    ServiceErrorSource::Internal,
132                    format!("getting offers for partition {partition}"),
133                    e,
134                )
135            })?;
136
137        let mut offers = Vec::new();
138        for model in models {
139            offers.push(OfferRecord {
140                partition: model.partition,
141                id: model.id,
142                offer: OfferRecordSparse {
143                    max_sendable: model.max_sendable as u64,
144                    min_sendable: model.min_sendable as u64,
145                    metadata_id: model.metadata_id,
146                    metadata: None,
147                    timestamp: model.timestamp.into(),
148                    expires: model.expires.map(|dt| dt.into()),
149                },
150            });
151        }
152
153        Ok(offers)
154    }
155
156    async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
157        let now = Utc::now();
158        let active_model = offer_record_table::ActiveModel {
159            id: Set(offer.id),
160            partition: Set(offer.partition.clone()),
161            max_sendable: Set(offer.offer.max_sendable as i64),
162            min_sendable: Set(offer.offer.min_sendable as i64),
163            metadata_id: Set(offer.offer.metadata_id),
164            timestamp: Set(offer.offer.timestamp.into()),
165            expires: Set(offer.offer.expires.map(|dt| dt.into())),
166            created_at: Set(now.into()),
167            updated_at: Set(now.into()),
168        };
169
170        match OfferRecordTable::insert(active_model).exec(&self.db).await {
171            Ok(_) => Ok(Some(offer.id)),
172            // PostgreSQL unique constraint violation
173            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
174                db_err,
175            )))) if db_err.is_unique_violation() => Ok(None),
176            // SQLite unique constraint violation
177            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
178                db_err,
179            )))) if db_err.is_unique_violation() => Ok(None),
180            // Foreign key constraint violation (metadata_id doesn't exist)
181            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
182                db_err,
183            )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
184                format!("post offer {offer:?}"),
185                format!(
186                    "metadata {} not found for offer {}",
187                    offer.offer.metadata_id, offer.id
188                ),
189            )),
190            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
191                db_err,
192            )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
193                format!("post offer {offer:?}"),
194                format!(
195                    "metadata {} not found for offer {}",
196                    offer.offer.metadata_id, offer.id
197                ),
198            )),
199            Err(e) => Err(OfferStoreError::from_db(
200                ServiceErrorSource::Internal,
201                format!(
202                    "inserting offer for partition {} id {}",
203                    offer.partition, offer.id
204                ),
205                e,
206            )),
207        }
208    }
209
210    async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
211        let now = Utc::now();
212        let future_timestamp = now + chrono::Duration::seconds(1);
213
214        let active_model = offer_record_table::ActiveModel {
215            id: Set(offer.id),
216            partition: Set(offer.partition.clone()),
217            max_sendable: Set(offer.offer.max_sendable as i64),
218            min_sendable: Set(offer.offer.min_sendable as i64),
219            metadata_id: Set(offer.offer.metadata_id),
220            timestamp: Set(offer.offer.timestamp.into()),
221            expires: Set(offer.offer.expires.map(|dt| dt.into())),
222            created_at: Set(now.into()), // Set for initial insert
223            updated_at: Set(now.into()),
224        };
225
226        let _result = match OfferRecordTable::insert(active_model)
227            .on_conflict(
228                OnConflict::columns([
229                    offer_record_table::Column::Partition,
230                    offer_record_table::Column::Id,
231                ])
232                .update_columns([
233                    offer_record_table::Column::MaxSendable,
234                    offer_record_table::Column::MinSendable,
235                    offer_record_table::Column::MetadataId,
236                    offer_record_table::Column::Timestamp,
237                    offer_record_table::Column::Expires,
238                ])
239                .value(Column::UpdatedAt, Expr::val(future_timestamp))
240                .to_owned(),
241            )
242            .exec(&self.db)
243            .await
244        {
245            Ok(result) => result,
246            // Foreign key constraint violation (metadata_id doesn't exist)
247            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
248                db_err,
249            )))) if db_err.is_foreign_key_violation() => {
250                return Err(OfferStoreError::invalid_input_error(
251                    format!("put offer {offer:?}"),
252                    format!(
253                        "metadata {} not found for offer {}",
254                        offer.offer.metadata_id, offer.id
255                    ),
256                ));
257            }
258            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
259                db_err,
260            )))) if db_err.is_foreign_key_violation() => {
261                return Err(OfferStoreError::invalid_input_error(
262                    format!("put offer {offer:?}"),
263                    format!(
264                        "metadata {} not found for offer {}",
265                        offer.offer.metadata_id, offer.id
266                    ),
267                ));
268            }
269            Err(e) => {
270                return Err(OfferStoreError::from_db(
271                    ServiceErrorSource::Internal,
272                    format!(
273                        "upserting offer for partition {} id {}",
274                        offer.partition, offer.id
275                    ),
276                    e,
277                ));
278            }
279        };
280
281        // Fetch only the timestamps to compare
282        let result = OfferRecordTable::find()
283            .filter(offer_record_table::Column::Partition.eq(offer.partition.clone()))
284            .filter(offer_record_table::Column::Id.eq(offer.id))
285            .select_only()
286            .column(offer_record_table::Column::CreatedAt)
287            .column(offer_record_table::Column::UpdatedAt)
288            .into_tuple::<(
289                chrono::DateTime<chrono::FixedOffset>,
290                chrono::DateTime<chrono::FixedOffset>,
291            )>()
292            .one(&self.db)
293            .await
294            .map_err(|e| {
295                OfferStoreError::from_db(
296                    ServiceErrorSource::Internal,
297                    format!(
298                        "fetching offer after upsert for partition {} id {}",
299                        offer.partition, offer.id
300                    ),
301                    e,
302                )
303            })?
304            .ok_or_else(|| {
305                OfferStoreError::from_db(
306                    ServiceErrorSource::Internal,
307                    "upsert succeeded but record not found",
308                    sea_orm::DbErr::RecordNotFound(
309                        "Record should exist after successful upsert".to_string(),
310                    ),
311                )
312            })?;
313
314        // Compare timestamps to determine if it was insert (true) or update (false)
315        Ok(result.0 == result.1)
316    }
317
318    async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
319        let result = OfferRecordTable::delete_by_id((partition.to_string(), *id))
320            .exec(&self.db)
321            .await
322            .map_err(|e| {
323                OfferStoreError::from_db(
324                    ServiceErrorSource::Internal,
325                    format!("deleting offer for partition {partition} id {id}"),
326                    e,
327                )
328            })?;
329
330        Ok(result.rows_affected > 0)
331    }
332}
333
334#[async_trait]
335impl OfferMetadataStore for DbOfferStore {
336    type Error = OfferStoreError;
337
338    async fn get_metadata(
339        &self,
340        partition: &str,
341        id: &Uuid,
342    ) -> Result<Option<OfferMetadata>, Self::Error> {
343        let model = OfferMetadataTable::find_by_id((partition.to_string(), *id))
344            .one(&self.db)
345            .await
346            .map_err(|e| {
347                OfferStoreError::from_db(
348                    ServiceErrorSource::Internal,
349                    format!("getting metadata for partition {partition} id {id}"),
350                    e,
351                )
352            })?;
353
354        match model {
355            Some(model) => {
356                let metadata = serde_json::from_value(model.metadata).map_err(|e| {
357                    OfferStoreError::serialization_error(
358                        ServiceErrorSource::Internal,
359                        format!("deserializing metadata for partition {partition} id {id}",),
360                        e,
361                    )
362                })?;
363
364                Ok(Some(OfferMetadata {
365                    id: model.id,
366                    partition: model.partition,
367                    metadata,
368                }))
369            }
370            None => Ok(None),
371        }
372    }
373
374    async fn get_all_metadata(
375        &self,
376        partition: &str,
377        start: usize,
378        count: usize,
379    ) -> Result<Vec<OfferMetadata>, Self::Error> {
380        let models = OfferMetadataTable::find()
381            .filter(offer_metadata_table::Column::Partition.eq(partition))
382            .order_by_asc(offer_metadata_table::Column::CreatedAt)
383            .order_by_asc(offer_metadata_table::Column::Id)
384            .offset(start as u64)
385            .limit(count as u64)
386            .all(&self.db)
387            .await
388            .map_err(|e| {
389                OfferStoreError::from_db(
390                    ServiceErrorSource::Internal,
391                    format!("getting all metadata for partition {partition}"),
392                    e,
393                )
394            })?;
395
396        let mut metadata_list = Vec::new();
397        for model in models {
398            let metadata = serde_json::from_value(model.metadata).map_err(|e| {
399                OfferStoreError::serialization_error(
400                    ServiceErrorSource::Internal,
401                    format!(
402                        "deserializing metadata for partition {} id {}",
403                        partition, model.id
404                    ),
405                    e,
406                )
407            })?;
408
409            metadata_list.push(OfferMetadata {
410                id: model.id,
411                partition: model.partition,
412                metadata,
413            });
414        }
415
416        Ok(metadata_list)
417    }
418
419    async fn post_metadata(&self, offer: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
420        let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
421            OfferStoreError::serialization_error(
422                ServiceErrorSource::Internal,
423                format!(
424                    "serializing metadata for partition {} id {}",
425                    offer.partition, offer.id
426                ),
427                e,
428            )
429        })?;
430
431        let now = Utc::now();
432        let active_model = offer_metadata_table::ActiveModel {
433            id: Set(offer.id),
434            partition: Set(offer.partition.clone()),
435            metadata: Set(metadata_json),
436            created_at: Set(now.into()),
437            updated_at: Set(now.into()),
438        };
439
440        match OfferMetadataTable::insert(active_model)
441            .exec(&self.db)
442            .await
443        {
444            Ok(_) => Ok(Some(offer.id)),
445            // PostgreSQL unique constraint violation
446            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
447                db_err,
448            )))) if db_err.is_unique_violation() => Ok(None),
449            // SQLite unique constraint violation
450            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
451                db_err,
452            )))) if db_err.is_unique_violation() => Ok(None),
453            Err(e) => Err(OfferStoreError::from_db(
454                ServiceErrorSource::Internal,
455                format!(
456                    "inserting metadata for partition {} id {}",
457                    offer.partition, offer.id
458                ),
459                e,
460            )),
461        }
462    }
463
464    async fn put_metadata(&self, offer: OfferMetadata) -> Result<bool, Self::Error> {
465        let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
466            OfferStoreError::serialization_error(
467                ServiceErrorSource::Internal,
468                format!(
469                    "serializing metadata for partition {} id {}",
470                    offer.partition, offer.id
471                ),
472                e,
473            )
474        })?;
475
476        let now = Utc::now();
477        let future_timestamp = now + chrono::Duration::seconds(1);
478
479        let active_model = offer_metadata_table::ActiveModel {
480            id: Set(offer.id),
481            partition: Set(offer.partition.clone()),
482            metadata: Set(metadata_json),
483            created_at: Set(now.into()), // Set for initial insert
484            updated_at: Set(now.into()),
485        };
486
487        let _result = OfferMetadataTable::insert(active_model)
488            .on_conflict(
489                OnConflict::columns([
490                    offer_metadata_table::Column::Partition,
491                    offer_metadata_table::Column::Id,
492                ])
493                .update_columns([offer_metadata_table::Column::Metadata])
494                .value(Column::UpdatedAt, Expr::val(future_timestamp))
495                .to_owned(),
496            )
497            .exec(&self.db)
498            .await
499            .map_err(|e| {
500                OfferStoreError::from_db(
501                    ServiceErrorSource::Internal,
502                    format!(
503                        "upserting metadata for partition {} id {}",
504                        offer.partition, offer.id
505                    ),
506                    e,
507                )
508            })?;
509
510        // Fetch only the timestamps to compare
511        let result = OfferMetadataTable::find()
512            .filter(offer_metadata_table::Column::Partition.eq(offer.partition.clone()))
513            .filter(offer_metadata_table::Column::Id.eq(offer.id))
514            .select_only()
515            .column(offer_metadata_table::Column::CreatedAt)
516            .column(offer_metadata_table::Column::UpdatedAt)
517            .into_tuple::<(
518                chrono::DateTime<chrono::FixedOffset>,
519                chrono::DateTime<chrono::FixedOffset>,
520            )>()
521            .one(&self.db)
522            .await
523            .map_err(|e| {
524                OfferStoreError::from_db(
525                    ServiceErrorSource::Internal,
526                    format!(
527                        "fetching metadata after upsert for partition {} id {}",
528                        offer.partition, offer.id
529                    ),
530                    e,
531                )
532            })?
533            .ok_or_else(|| {
534                OfferStoreError::from_db(
535                    ServiceErrorSource::Internal,
536                    "upsert succeeded but record not found",
537                    sea_orm::DbErr::RecordNotFound(
538                        "Record should exist after successful upsert".to_string(),
539                    ),
540                )
541            })?;
542
543        // Compare timestamps to determine if it was insert (true) or update (false)
544        Ok(result.0 == result.1)
545    }
546
547    async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
548        let result = OfferMetadataTable::delete_by_id((partition.to_string(), *id))
549            .exec(&self.db)
550            .await
551            .map_err(|e| match e {
552                sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
553                    db_err,
554                ))) if db_err.is_foreign_key_violation()
555                    // sqlite
556                    || db_err.code().as_deref() == Some("1811") =>
557                {
558                    OfferStoreError::invalid_input_error(
559                        format!("deleting metadata for partition {partition} id {id}"),
560                        format!("metadata {} is referenced by existing offers", id),
561                    )
562                }
563                _ => OfferStoreError::from_db(
564                    ServiceErrorSource::Internal,
565                    format!("deleting metadata for partition {partition} id {id}"),
566                    e,
567                ),
568            })?;
569
570        Ok(result.rows_affected > 0)
571    }
572}