switchgear_service/components/offer/
db.rs

1use crate::api::offer::{
2    Offer, OfferMetadata, OfferMetadataStore, OfferProvider, OfferRecord, OfferStore,
3};
4use crate::api::service::ServiceErrorSource;
5use crate::components::discovery::db::Column;
6use crate::components::offer::db_orm::prelude::*;
7use crate::components::offer::db_orm::{offer_metadata_table, offer_record_table};
8use crate::components::offer::error::OfferStoreError;
9use async_trait::async_trait;
10use chrono::Utc;
11use sea_orm::{
12    ColumnTrait, Database, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
13    Set,
14};
15use sha2::{Digest, Sha256};
16use switchgear_migration::OnConflict;
17use switchgear_migration::{Expr, MigratorTrait};
18use uuid::Uuid;
19
20#[derive(Clone, Debug)]
21pub struct DbOfferStore {
22    db: DatabaseConnection,
23}
24
25impl DbOfferStore {
26    pub async fn connect(url: &str, max_connections: u32) -> Result<Self, OfferStoreError> {
27        let mut opt = sea_orm::ConnectOptions::new(url);
28        opt.max_connections(max_connections);
29        let db = Database::connect(opt.clone()).await.map_err(|e| {
30            OfferStoreError::from_db(
31                ServiceErrorSource::Internal,
32                format!("connecting to database with {opt:?}",),
33                e,
34            )
35        })?;
36
37        Ok(Self::from_db(db))
38    }
39
40    pub async fn migrate_up(&self) -> Result<(), OfferStoreError> {
41        switchgear_migration::OfferMigrator::up(&self.db, None)
42            .await
43            .map_err(|e| {
44                OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database up", e)
45            })?;
46        Ok(())
47    }
48
49    pub async fn migrate_down(&self) -> Result<(), OfferStoreError> {
50        switchgear_migration::OfferMigrator::down(&self.db, None)
51            .await
52            .map_err(|e| {
53                OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database down", e)
54            })?;
55        Ok(())
56    }
57
58    pub fn from_db(db: DatabaseConnection) -> Self {
59        Self { db }
60    }
61}
62
63#[async_trait]
64impl OfferStore for DbOfferStore {
65    type Error = OfferStoreError;
66
67    async fn get_offer(
68        &self,
69        partition: &str,
70        id: &Uuid,
71    ) -> Result<Option<OfferRecord>, Self::Error> {
72        let model = OfferRecordTable::find_by_id((partition.to_string(), *id))
73            .one(&self.db)
74            .await
75            .map_err(|e| {
76                OfferStoreError::from_db(
77                    ServiceErrorSource::Internal,
78                    format!("getting offer for partition {partition} id {id}"),
79                    e,
80                )
81            })?;
82
83        match model {
84            Some(model) => Ok(Some(OfferRecord {
85                partition: model.partition,
86                id: model.id,
87                offer: crate::api::offer::OfferRecordSparse {
88                    max_sendable: model.max_sendable as u64,
89                    min_sendable: model.min_sendable as u64,
90                    metadata_id: model.metadata_id,
91                    timestamp: model.timestamp.into(),
92                    expires: model.expires.map(|dt| dt.into()),
93                },
94            })),
95            None => Ok(None),
96        }
97    }
98
99    async fn get_offers(
100        &self,
101        partition: &str,
102        start: usize,
103        count: usize,
104    ) -> Result<Vec<OfferRecord>, Self::Error> {
105        let models = OfferRecordTable::find()
106            .filter(offer_record_table::Column::Partition.eq(partition))
107            .order_by_asc(offer_record_table::Column::CreatedAt)
108            .order_by_asc(offer_record_table::Column::Id)
109            .offset(start as u64)
110            .limit(count as u64)
111            .all(&self.db)
112            .await
113            .map_err(|e| {
114                OfferStoreError::from_db(
115                    ServiceErrorSource::Internal,
116                    format!("getting offers for partition {partition}"),
117                    e,
118                )
119            })?;
120
121        let mut offers = Vec::new();
122        for model in models {
123            offers.push(OfferRecord {
124                partition: model.partition,
125                id: model.id,
126                offer: crate::api::offer::OfferRecordSparse {
127                    max_sendable: model.max_sendable as u64,
128                    min_sendable: model.min_sendable as u64,
129                    metadata_id: model.metadata_id,
130                    timestamp: model.timestamp.into(),
131                    expires: model.expires.map(|dt| dt.into()),
132                },
133            });
134        }
135
136        Ok(offers)
137    }
138
139    async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
140        let now = Utc::now();
141        let active_model = offer_record_table::ActiveModel {
142            id: Set(offer.id),
143            partition: Set(offer.partition.clone()),
144            max_sendable: Set(offer.offer.max_sendable as i64),
145            min_sendable: Set(offer.offer.min_sendable as i64),
146            metadata_id: Set(offer.offer.metadata_id),
147            timestamp: Set(offer.offer.timestamp.into()),
148            expires: Set(offer.offer.expires.map(|dt| dt.into())),
149            created_at: Set(now.into()),
150            updated_at: Set(now.into()),
151        };
152
153        match OfferRecordTable::insert(active_model).exec(&self.db).await {
154            Ok(_) => Ok(Some(offer.id)),
155            // PostgreSQL unique constraint violation
156            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
157                db_err,
158            )))) if db_err.is_unique_violation() => Ok(None),
159            // SQLite unique constraint violation
160            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
161                db_err,
162            )))) if db_err.is_unique_violation() => Ok(None),
163            // Foreign key constraint violation (metadata_id doesn't exist)
164            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
165                db_err,
166            )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
167                format!("post offer {offer:?}"),
168                format!(
169                    "metadata {} not found for offer {}",
170                    offer.offer.metadata_id, offer.id
171                ),
172            )),
173            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
174                db_err,
175            )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
176                format!("post offer {offer:?}"),
177                format!(
178                    "metadata {} not found for offer {}",
179                    offer.offer.metadata_id, offer.id
180                ),
181            )),
182            Err(e) => Err(OfferStoreError::from_db(
183                ServiceErrorSource::Internal,
184                format!(
185                    "inserting offer for partition {} id {}",
186                    offer.partition, offer.id
187                ),
188                e,
189            )),
190        }
191    }
192
193    async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
194        let now = Utc::now();
195        let future_timestamp = now + chrono::Duration::seconds(1);
196
197        let active_model = offer_record_table::ActiveModel {
198            id: Set(offer.id),
199            partition: Set(offer.partition.clone()),
200            max_sendable: Set(offer.offer.max_sendable as i64),
201            min_sendable: Set(offer.offer.min_sendable as i64),
202            metadata_id: Set(offer.offer.metadata_id),
203            timestamp: Set(offer.offer.timestamp.into()),
204            expires: Set(offer.offer.expires.map(|dt| dt.into())),
205            created_at: Set(now.into()), // Set for initial insert
206            updated_at: Set(now.into()),
207        };
208
209        let _result = match OfferRecordTable::insert(active_model)
210            .on_conflict(
211                OnConflict::columns([
212                    offer_record_table::Column::Partition,
213                    offer_record_table::Column::Id,
214                ])
215                .update_columns([
216                    offer_record_table::Column::MaxSendable,
217                    offer_record_table::Column::MinSendable,
218                    offer_record_table::Column::MetadataId,
219                    offer_record_table::Column::Timestamp,
220                    offer_record_table::Column::Expires,
221                ])
222                .value(Column::UpdatedAt, Expr::val(future_timestamp))
223                .to_owned(),
224            )
225            .exec(&self.db)
226            .await
227        {
228            Ok(result) => result,
229            // Foreign key constraint violation (metadata_id doesn't exist)
230            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
231                db_err,
232            )))) if db_err.is_foreign_key_violation() => {
233                return Err(OfferStoreError::invalid_input_error(
234                    format!("put offer {offer:?}"),
235                    format!(
236                        "metadata {} not found for offer {}",
237                        offer.offer.metadata_id, offer.id
238                    ),
239                ));
240            }
241            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
242                db_err,
243            )))) if db_err.is_foreign_key_violation() => {
244                return Err(OfferStoreError::invalid_input_error(
245                    format!("put offer {offer:?}"),
246                    format!(
247                        "metadata {} not found for offer {}",
248                        offer.offer.metadata_id, offer.id
249                    ),
250                ));
251            }
252            Err(e) => {
253                return Err(OfferStoreError::from_db(
254                    ServiceErrorSource::Internal,
255                    format!(
256                        "upserting offer for partition {} id {}",
257                        offer.partition, offer.id
258                    ),
259                    e,
260                ));
261            }
262        };
263
264        // Fetch only the timestamps to compare
265        let result = OfferRecordTable::find()
266            .filter(offer_record_table::Column::Partition.eq(offer.partition.clone()))
267            .filter(offer_record_table::Column::Id.eq(offer.id))
268            .select_only()
269            .column(offer_record_table::Column::CreatedAt)
270            .column(offer_record_table::Column::UpdatedAt)
271            .into_tuple::<(
272                chrono::DateTime<chrono::FixedOffset>,
273                chrono::DateTime<chrono::FixedOffset>,
274            )>()
275            .one(&self.db)
276            .await
277            .map_err(|e| {
278                OfferStoreError::from_db(
279                    ServiceErrorSource::Internal,
280                    format!(
281                        "fetching offer after upsert for partition {} id {}",
282                        offer.partition, offer.id
283                    ),
284                    e,
285                )
286            })?
287            .ok_or_else(|| {
288                OfferStoreError::from_db(
289                    ServiceErrorSource::Internal,
290                    "upsert succeeded but record not found",
291                    sea_orm::DbErr::RecordNotFound(
292                        "Record should exist after successful upsert".to_string(),
293                    ),
294                )
295            })?;
296
297        // Compare timestamps to determine if it was insert (true) or update (false)
298        Ok(result.0 == result.1)
299    }
300
301    async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
302        let result = OfferRecordTable::delete_by_id((partition.to_string(), *id))
303            .exec(&self.db)
304            .await
305            .map_err(|e| {
306                OfferStoreError::from_db(
307                    ServiceErrorSource::Internal,
308                    format!("deleting offer for partition {partition} id {id}"),
309                    e,
310                )
311            })?;
312
313        Ok(result.rows_affected > 0)
314    }
315}
316
317#[async_trait]
318impl OfferMetadataStore for DbOfferStore {
319    type Error = OfferStoreError;
320
321    async fn get_metadata(
322        &self,
323        partition: &str,
324        id: &Uuid,
325    ) -> Result<Option<OfferMetadata>, Self::Error> {
326        let model = OfferMetadataTable::find_by_id((partition.to_string(), *id))
327            .one(&self.db)
328            .await
329            .map_err(|e| {
330                OfferStoreError::from_db(
331                    ServiceErrorSource::Internal,
332                    format!("getting metadata for partition {partition} id {id}"),
333                    e,
334                )
335            })?;
336
337        match model {
338            Some(model) => {
339                let metadata = serde_json::from_value(model.metadata).map_err(|e| {
340                    OfferStoreError::serialization_error(
341                        ServiceErrorSource::Internal,
342                        format!("deserializing metadata for partition {partition} id {id}",),
343                        e,
344                    )
345                })?;
346
347                Ok(Some(OfferMetadata {
348                    id: model.id,
349                    partition: model.partition,
350                    metadata,
351                }))
352            }
353            None => Ok(None),
354        }
355    }
356
357    async fn get_all_metadata(
358        &self,
359        partition: &str,
360        start: usize,
361        count: usize,
362    ) -> Result<Vec<OfferMetadata>, Self::Error> {
363        let models = OfferMetadataTable::find()
364            .filter(offer_metadata_table::Column::Partition.eq(partition))
365            .order_by_asc(offer_metadata_table::Column::CreatedAt)
366            .order_by_asc(offer_metadata_table::Column::Id)
367            .offset(start as u64)
368            .limit(count as u64)
369            .all(&self.db)
370            .await
371            .map_err(|e| {
372                OfferStoreError::from_db(
373                    ServiceErrorSource::Internal,
374                    format!("getting all metadata for partition {partition}"),
375                    e,
376                )
377            })?;
378
379        let mut metadata_list = Vec::new();
380        for model in models {
381            let metadata = serde_json::from_value(model.metadata).map_err(|e| {
382                OfferStoreError::serialization_error(
383                    ServiceErrorSource::Internal,
384                    format!(
385                        "deserializing metadata for partition {} id {}",
386                        partition, model.id
387                    ),
388                    e,
389                )
390            })?;
391
392            metadata_list.push(OfferMetadata {
393                id: model.id,
394                partition: model.partition,
395                metadata,
396            });
397        }
398
399        Ok(metadata_list)
400    }
401
402    async fn post_metadata(&self, offer: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
403        let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
404            OfferStoreError::serialization_error(
405                ServiceErrorSource::Internal,
406                format!(
407                    "serializing metadata for partition {} id {}",
408                    offer.partition, offer.id
409                ),
410                e,
411            )
412        })?;
413
414        let now = Utc::now();
415        let active_model = offer_metadata_table::ActiveModel {
416            id: Set(offer.id),
417            partition: Set(offer.partition.clone()),
418            metadata: Set(metadata_json),
419            created_at: Set(now.into()),
420            updated_at: Set(now.into()),
421        };
422
423        match OfferMetadataTable::insert(active_model)
424            .exec(&self.db)
425            .await
426        {
427            Ok(_) => Ok(Some(offer.id)),
428            // PostgreSQL unique constraint violation
429            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
430                db_err,
431            )))) if db_err.is_unique_violation() => Ok(None),
432            // SQLite unique constraint violation
433            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
434                db_err,
435            )))) if db_err.is_unique_violation() => Ok(None),
436            Err(e) => Err(OfferStoreError::from_db(
437                ServiceErrorSource::Internal,
438                format!(
439                    "inserting metadata for partition {} id {}",
440                    offer.partition, offer.id
441                ),
442                e,
443            )),
444        }
445    }
446
447    async fn put_metadata(&self, offer: OfferMetadata) -> Result<bool, Self::Error> {
448        let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
449            OfferStoreError::serialization_error(
450                ServiceErrorSource::Internal,
451                format!(
452                    "serializing metadata for partition {} id {}",
453                    offer.partition, offer.id
454                ),
455                e,
456            )
457        })?;
458
459        let now = Utc::now();
460        let future_timestamp = now + chrono::Duration::seconds(1);
461
462        let active_model = offer_metadata_table::ActiveModel {
463            id: Set(offer.id),
464            partition: Set(offer.partition.clone()),
465            metadata: Set(metadata_json),
466            created_at: Set(now.into()), // Set for initial insert
467            updated_at: Set(now.into()),
468        };
469
470        let _result = OfferMetadataTable::insert(active_model)
471            .on_conflict(
472                OnConflict::columns([
473                    offer_metadata_table::Column::Partition,
474                    offer_metadata_table::Column::Id,
475                ])
476                .update_columns([offer_metadata_table::Column::Metadata])
477                .value(Column::UpdatedAt, Expr::val(future_timestamp))
478                .to_owned(),
479            )
480            .exec(&self.db)
481            .await
482            .map_err(|e| {
483                OfferStoreError::from_db(
484                    ServiceErrorSource::Internal,
485                    format!(
486                        "upserting metadata for partition {} id {}",
487                        offer.partition, offer.id
488                    ),
489                    e,
490                )
491            })?;
492
493        // Fetch only the timestamps to compare
494        let result = OfferMetadataTable::find()
495            .filter(offer_metadata_table::Column::Partition.eq(offer.partition.clone()))
496            .filter(offer_metadata_table::Column::Id.eq(offer.id))
497            .select_only()
498            .column(offer_metadata_table::Column::CreatedAt)
499            .column(offer_metadata_table::Column::UpdatedAt)
500            .into_tuple::<(
501                chrono::DateTime<chrono::FixedOffset>,
502                chrono::DateTime<chrono::FixedOffset>,
503            )>()
504            .one(&self.db)
505            .await
506            .map_err(|e| {
507                OfferStoreError::from_db(
508                    ServiceErrorSource::Internal,
509                    format!(
510                        "fetching metadata after upsert for partition {} id {}",
511                        offer.partition, offer.id
512                    ),
513                    e,
514                )
515            })?
516            .ok_or_else(|| {
517                OfferStoreError::from_db(
518                    ServiceErrorSource::Internal,
519                    "upsert succeeded but record not found",
520                    sea_orm::DbErr::RecordNotFound(
521                        "Record should exist after successful upsert".to_string(),
522                    ),
523                )
524            })?;
525
526        // Compare timestamps to determine if it was insert (true) or update (false)
527        Ok(result.0 == result.1)
528    }
529
530    async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
531        let result = OfferMetadataTable::delete_by_id((partition.to_string(), *id))
532            .exec(&self.db)
533            .await
534            .map_err(|e| match e {
535                sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
536                    db_err,
537                ))) if db_err.is_foreign_key_violation()
538                    // sqlite
539                    || db_err.code().as_deref() == Some("1811") =>
540                {
541                    OfferStoreError::invalid_input_error(
542                        format!("deleting metadata for partition {partition} id {id}"),
543                        format!("metadata {} is referenced by existing offers", id),
544                    )
545                }
546                _ => OfferStoreError::from_db(
547                    ServiceErrorSource::Internal,
548                    format!("deleting metadata for partition {partition} id {id}"),
549                    e,
550                ),
551            })?;
552
553        Ok(result.rows_affected > 0)
554    }
555}
556
557#[async_trait]
558impl OfferProvider for DbOfferStore {
559    type Error = OfferStoreError;
560
561    async fn offer(
562        &self,
563        _hostname: &str,
564        partition: &str,
565        id: &Uuid,
566    ) -> Result<Option<Offer>, Self::Error> {
567        let result = OfferRecordTable::find_by_id((partition.to_string(), *id))
568            .find_also_related(OfferMetadataTable)
569            .one(&self.db)
570            .await
571            .map_err(|e| {
572                OfferStoreError::from_db(
573                    ServiceErrorSource::Internal,
574                    format!("getting offer with metadata for partition {partition} id {id}",),
575                    e,
576                )
577            })?;
578
579        let (offer_model, metadata_model) = match result {
580            Some((offer, Some(metadata))) => (offer, metadata),
581            _ => return Ok(None),
582        };
583
584        use crate::api::offer::OfferMetadataSparse;
585        let metadata_sparse: OfferMetadataSparse = serde_json::from_value(metadata_model.metadata)
586            .map_err(|e| {
587                OfferStoreError::serialization_error(
588                    ServiceErrorSource::Internal,
589                    format!("deserializing metadata for offer {id} in partition {partition}",),
590                    e,
591                )
592            })?;
593
594        use crate::api::lnurl::LnUrlOfferMetadata;
595        let lnurl_metadata = LnUrlOfferMetadata(metadata_sparse);
596        let metadata_json_string = serde_json::to_string(&lnurl_metadata).map_err(|e| {
597            OfferStoreError::serialization_error(
598                ServiceErrorSource::Internal,
599                format!("serializing metadata for offer {id} in partition {partition}",),
600                e,
601            )
602        })?;
603
604        let mut hasher = Sha256::new();
605        hasher.update(metadata_json_string.as_bytes());
606        let metadata_json_hash = hasher.finalize().into();
607
608        Ok(Some(Offer {
609            partition: offer_model.partition,
610            id: offer_model.id,
611            max_sendable: offer_model.max_sendable as u64,
612            min_sendable: offer_model.min_sendable as u64,
613            metadata_json_string,
614            metadata_json_hash,
615            timestamp: offer_model.timestamp.with_timezone(&Utc),
616            expires: offer_model.expires.map(|dt| dt.with_timezone(&Utc)),
617        }))
618    }
619}