switchgear_components/offer/
db.rs

1use crate::discovery::db::Column;
2use crate::offer::db_orm::prelude::*;
3use crate::offer::db_orm::{offer_metadata_table, offer_record_table};
4use crate::offer::error::OfferStoreError;
5use async_trait::async_trait;
6use chrono::Utc;
7use sea_orm::{
8    ColumnTrait, Database, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
9    Set,
10};
11use sha2::{Digest, Sha256};
12use switchgear_migration::OnConflict;
13use switchgear_migration::{Expr, MigratorTrait};
14use switchgear_service_api::lnurl::LnUrlOfferMetadata;
15use switchgear_service_api::offer::{
16    Offer, OfferMetadata, OfferMetadataSparse, OfferMetadataStore, OfferProvider, OfferRecord,
17    OfferRecordSparse, OfferStore,
18};
19use switchgear_service_api::service::ServiceErrorSource;
20use uuid::Uuid;
21
22#[derive(Clone, Debug)]
23pub struct DbOfferStore {
24    db: DatabaseConnection,
25}
26
27impl DbOfferStore {
28    pub async fn connect(uri: &str, max_connections: u32) -> Result<Self, OfferStoreError> {
29        let mut opt = sea_orm::ConnectOptions::new(uri);
30        opt.max_connections(max_connections);
31        let db = Database::connect(opt).await.map_err(|e| {
32            OfferStoreError::from_db(
33                ServiceErrorSource::Internal,
34                "connecting to offer database",
35                e,
36            )
37        })?;
38
39        Ok(Self::from_db(db))
40    }
41
42    pub async fn migrate_up(&self) -> Result<(), OfferStoreError> {
43        switchgear_migration::OfferMigrator::up(&self.db, None)
44            .await
45            .map_err(|e| {
46                OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database up", e)
47            })?;
48        Ok(())
49    }
50
51    pub async fn migrate_down(&self) -> Result<(), OfferStoreError> {
52        switchgear_migration::OfferMigrator::down(&self.db, None)
53            .await
54            .map_err(|e| {
55                OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database down", e)
56            })?;
57        Ok(())
58    }
59
60    pub fn from_db(db: DatabaseConnection) -> Self {
61        Self { db }
62    }
63}
64
65#[async_trait]
66impl OfferStore for DbOfferStore {
67    type Error = OfferStoreError;
68
69    async fn get_offer(
70        &self,
71        partition: &str,
72        id: &Uuid,
73    ) -> Result<Option<OfferRecord>, Self::Error> {
74        let model = OfferRecordTable::find_by_id((partition.to_string(), *id))
75            .one(&self.db)
76            .await
77            .map_err(|e| {
78                OfferStoreError::from_db(
79                    ServiceErrorSource::Internal,
80                    format!("getting offer for partition {partition} id {id}"),
81                    e,
82                )
83            })?;
84
85        match model {
86            Some(model) => Ok(Some(OfferRecord {
87                partition: model.partition,
88                id: model.id,
89                offer: OfferRecordSparse {
90                    max_sendable: model.max_sendable as u64,
91                    min_sendable: model.min_sendable as u64,
92                    metadata_id: model.metadata_id,
93                    timestamp: model.timestamp.into(),
94                    expires: model.expires.map(|dt| dt.into()),
95                },
96            })),
97            None => Ok(None),
98        }
99    }
100
101    async fn get_offers(
102        &self,
103        partition: &str,
104        start: usize,
105        count: usize,
106    ) -> Result<Vec<OfferRecord>, Self::Error> {
107        let models = OfferRecordTable::find()
108            .filter(offer_record_table::Column::Partition.eq(partition))
109            .order_by_asc(offer_record_table::Column::CreatedAt)
110            .order_by_asc(offer_record_table::Column::Id)
111            .offset(start as u64)
112            .limit(count as u64)
113            .all(&self.db)
114            .await
115            .map_err(|e| {
116                OfferStoreError::from_db(
117                    ServiceErrorSource::Internal,
118                    format!("getting offers for partition {partition}"),
119                    e,
120                )
121            })?;
122
123        let mut offers = Vec::new();
124        for model in models {
125            offers.push(OfferRecord {
126                partition: model.partition,
127                id: model.id,
128                offer: OfferRecordSparse {
129                    max_sendable: model.max_sendable as u64,
130                    min_sendable: model.min_sendable as u64,
131                    metadata_id: model.metadata_id,
132                    timestamp: model.timestamp.into(),
133                    expires: model.expires.map(|dt| dt.into()),
134                },
135            });
136        }
137
138        Ok(offers)
139    }
140
141    async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
142        let now = Utc::now();
143        let active_model = offer_record_table::ActiveModel {
144            id: Set(offer.id),
145            partition: Set(offer.partition.clone()),
146            max_sendable: Set(offer.offer.max_sendable as i64),
147            min_sendable: Set(offer.offer.min_sendable as i64),
148            metadata_id: Set(offer.offer.metadata_id),
149            timestamp: Set(offer.offer.timestamp.into()),
150            expires: Set(offer.offer.expires.map(|dt| dt.into())),
151            created_at: Set(now.into()),
152            updated_at: Set(now.into()),
153        };
154
155        match OfferRecordTable::insert(active_model).exec(&self.db).await {
156            Ok(_) => Ok(Some(offer.id)),
157            // PostgreSQL unique constraint violation
158            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
159                db_err,
160            )))) if db_err.is_unique_violation() => Ok(None),
161            // SQLite unique constraint violation
162            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
163                db_err,
164            )))) if db_err.is_unique_violation() => Ok(None),
165            // Foreign key constraint violation (metadata_id doesn't exist)
166            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
167                db_err,
168            )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
169                format!("post offer {offer:?}"),
170                format!(
171                    "metadata {} not found for offer {}",
172                    offer.offer.metadata_id, offer.id
173                ),
174            )),
175            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
176                db_err,
177            )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
178                format!("post offer {offer:?}"),
179                format!(
180                    "metadata {} not found for offer {}",
181                    offer.offer.metadata_id, offer.id
182                ),
183            )),
184            Err(e) => Err(OfferStoreError::from_db(
185                ServiceErrorSource::Internal,
186                format!(
187                    "inserting offer for partition {} id {}",
188                    offer.partition, offer.id
189                ),
190                e,
191            )),
192        }
193    }
194
195    async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
196        let now = Utc::now();
197        let future_timestamp = now + chrono::Duration::seconds(1);
198
199        let active_model = offer_record_table::ActiveModel {
200            id: Set(offer.id),
201            partition: Set(offer.partition.clone()),
202            max_sendable: Set(offer.offer.max_sendable as i64),
203            min_sendable: Set(offer.offer.min_sendable as i64),
204            metadata_id: Set(offer.offer.metadata_id),
205            timestamp: Set(offer.offer.timestamp.into()),
206            expires: Set(offer.offer.expires.map(|dt| dt.into())),
207            created_at: Set(now.into()), // Set for initial insert
208            updated_at: Set(now.into()),
209        };
210
211        let _result = match OfferRecordTable::insert(active_model)
212            .on_conflict(
213                OnConflict::columns([
214                    offer_record_table::Column::Partition,
215                    offer_record_table::Column::Id,
216                ])
217                .update_columns([
218                    offer_record_table::Column::MaxSendable,
219                    offer_record_table::Column::MinSendable,
220                    offer_record_table::Column::MetadataId,
221                    offer_record_table::Column::Timestamp,
222                    offer_record_table::Column::Expires,
223                ])
224                .value(Column::UpdatedAt, Expr::val(future_timestamp))
225                .to_owned(),
226            )
227            .exec(&self.db)
228            .await
229        {
230            Ok(result) => result,
231            // Foreign key constraint violation (metadata_id doesn't exist)
232            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
233                db_err,
234            )))) if db_err.is_foreign_key_violation() => {
235                return Err(OfferStoreError::invalid_input_error(
236                    format!("put offer {offer:?}"),
237                    format!(
238                        "metadata {} not found for offer {}",
239                        offer.offer.metadata_id, offer.id
240                    ),
241                ));
242            }
243            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
244                db_err,
245            )))) if db_err.is_foreign_key_violation() => {
246                return Err(OfferStoreError::invalid_input_error(
247                    format!("put offer {offer:?}"),
248                    format!(
249                        "metadata {} not found for offer {}",
250                        offer.offer.metadata_id, offer.id
251                    ),
252                ));
253            }
254            Err(e) => {
255                return Err(OfferStoreError::from_db(
256                    ServiceErrorSource::Internal,
257                    format!(
258                        "upserting offer for partition {} id {}",
259                        offer.partition, offer.id
260                    ),
261                    e,
262                ));
263            }
264        };
265
266        // Fetch only the timestamps to compare
267        let result = OfferRecordTable::find()
268            .filter(offer_record_table::Column::Partition.eq(offer.partition.clone()))
269            .filter(offer_record_table::Column::Id.eq(offer.id))
270            .select_only()
271            .column(offer_record_table::Column::CreatedAt)
272            .column(offer_record_table::Column::UpdatedAt)
273            .into_tuple::<(
274                chrono::DateTime<chrono::FixedOffset>,
275                chrono::DateTime<chrono::FixedOffset>,
276            )>()
277            .one(&self.db)
278            .await
279            .map_err(|e| {
280                OfferStoreError::from_db(
281                    ServiceErrorSource::Internal,
282                    format!(
283                        "fetching offer after upsert for partition {} id {}",
284                        offer.partition, offer.id
285                    ),
286                    e,
287                )
288            })?
289            .ok_or_else(|| {
290                OfferStoreError::from_db(
291                    ServiceErrorSource::Internal,
292                    "upsert succeeded but record not found",
293                    sea_orm::DbErr::RecordNotFound(
294                        "Record should exist after successful upsert".to_string(),
295                    ),
296                )
297            })?;
298
299        // Compare timestamps to determine if it was insert (true) or update (false)
300        Ok(result.0 == result.1)
301    }
302
303    async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
304        let result = OfferRecordTable::delete_by_id((partition.to_string(), *id))
305            .exec(&self.db)
306            .await
307            .map_err(|e| {
308                OfferStoreError::from_db(
309                    ServiceErrorSource::Internal,
310                    format!("deleting offer for partition {partition} id {id}"),
311                    e,
312                )
313            })?;
314
315        Ok(result.rows_affected > 0)
316    }
317}
318
319#[async_trait]
320impl OfferMetadataStore for DbOfferStore {
321    type Error = OfferStoreError;
322
323    async fn get_metadata(
324        &self,
325        partition: &str,
326        id: &Uuid,
327    ) -> Result<Option<OfferMetadata>, Self::Error> {
328        let model = OfferMetadataTable::find_by_id((partition.to_string(), *id))
329            .one(&self.db)
330            .await
331            .map_err(|e| {
332                OfferStoreError::from_db(
333                    ServiceErrorSource::Internal,
334                    format!("getting metadata for partition {partition} id {id}"),
335                    e,
336                )
337            })?;
338
339        match model {
340            Some(model) => {
341                let metadata = serde_json::from_value(model.metadata).map_err(|e| {
342                    OfferStoreError::serialization_error(
343                        ServiceErrorSource::Internal,
344                        format!("deserializing metadata for partition {partition} id {id}",),
345                        e,
346                    )
347                })?;
348
349                Ok(Some(OfferMetadata {
350                    id: model.id,
351                    partition: model.partition,
352                    metadata,
353                }))
354            }
355            None => Ok(None),
356        }
357    }
358
359    async fn get_all_metadata(
360        &self,
361        partition: &str,
362        start: usize,
363        count: usize,
364    ) -> Result<Vec<OfferMetadata>, Self::Error> {
365        let models = OfferMetadataTable::find()
366            .filter(offer_metadata_table::Column::Partition.eq(partition))
367            .order_by_asc(offer_metadata_table::Column::CreatedAt)
368            .order_by_asc(offer_metadata_table::Column::Id)
369            .offset(start as u64)
370            .limit(count as u64)
371            .all(&self.db)
372            .await
373            .map_err(|e| {
374                OfferStoreError::from_db(
375                    ServiceErrorSource::Internal,
376                    format!("getting all metadata for partition {partition}"),
377                    e,
378                )
379            })?;
380
381        let mut metadata_list = Vec::new();
382        for model in models {
383            let metadata = serde_json::from_value(model.metadata).map_err(|e| {
384                OfferStoreError::serialization_error(
385                    ServiceErrorSource::Internal,
386                    format!(
387                        "deserializing metadata for partition {} id {}",
388                        partition, model.id
389                    ),
390                    e,
391                )
392            })?;
393
394            metadata_list.push(OfferMetadata {
395                id: model.id,
396                partition: model.partition,
397                metadata,
398            });
399        }
400
401        Ok(metadata_list)
402    }
403
404    async fn post_metadata(&self, offer: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
405        let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
406            OfferStoreError::serialization_error(
407                ServiceErrorSource::Internal,
408                format!(
409                    "serializing metadata for partition {} id {}",
410                    offer.partition, offer.id
411                ),
412                e,
413            )
414        })?;
415
416        let now = Utc::now();
417        let active_model = offer_metadata_table::ActiveModel {
418            id: Set(offer.id),
419            partition: Set(offer.partition.clone()),
420            metadata: Set(metadata_json),
421            created_at: Set(now.into()),
422            updated_at: Set(now.into()),
423        };
424
425        match OfferMetadataTable::insert(active_model)
426            .exec(&self.db)
427            .await
428        {
429            Ok(_) => Ok(Some(offer.id)),
430            // PostgreSQL unique constraint violation
431            Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
432                db_err,
433            )))) if db_err.is_unique_violation() => Ok(None),
434            // SQLite unique constraint violation
435            Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
436                db_err,
437            )))) if db_err.is_unique_violation() => Ok(None),
438            Err(e) => Err(OfferStoreError::from_db(
439                ServiceErrorSource::Internal,
440                format!(
441                    "inserting metadata for partition {} id {}",
442                    offer.partition, offer.id
443                ),
444                e,
445            )),
446        }
447    }
448
449    async fn put_metadata(&self, offer: OfferMetadata) -> Result<bool, Self::Error> {
450        let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
451            OfferStoreError::serialization_error(
452                ServiceErrorSource::Internal,
453                format!(
454                    "serializing metadata for partition {} id {}",
455                    offer.partition, offer.id
456                ),
457                e,
458            )
459        })?;
460
461        let now = Utc::now();
462        let future_timestamp = now + chrono::Duration::seconds(1);
463
464        let active_model = offer_metadata_table::ActiveModel {
465            id: Set(offer.id),
466            partition: Set(offer.partition.clone()),
467            metadata: Set(metadata_json),
468            created_at: Set(now.into()), // Set for initial insert
469            updated_at: Set(now.into()),
470        };
471
472        let _result = OfferMetadataTable::insert(active_model)
473            .on_conflict(
474                OnConflict::columns([
475                    offer_metadata_table::Column::Partition,
476                    offer_metadata_table::Column::Id,
477                ])
478                .update_columns([offer_metadata_table::Column::Metadata])
479                .value(Column::UpdatedAt, Expr::val(future_timestamp))
480                .to_owned(),
481            )
482            .exec(&self.db)
483            .await
484            .map_err(|e| {
485                OfferStoreError::from_db(
486                    ServiceErrorSource::Internal,
487                    format!(
488                        "upserting metadata for partition {} id {}",
489                        offer.partition, offer.id
490                    ),
491                    e,
492                )
493            })?;
494
495        // Fetch only the timestamps to compare
496        let result = OfferMetadataTable::find()
497            .filter(offer_metadata_table::Column::Partition.eq(offer.partition.clone()))
498            .filter(offer_metadata_table::Column::Id.eq(offer.id))
499            .select_only()
500            .column(offer_metadata_table::Column::CreatedAt)
501            .column(offer_metadata_table::Column::UpdatedAt)
502            .into_tuple::<(
503                chrono::DateTime<chrono::FixedOffset>,
504                chrono::DateTime<chrono::FixedOffset>,
505            )>()
506            .one(&self.db)
507            .await
508            .map_err(|e| {
509                OfferStoreError::from_db(
510                    ServiceErrorSource::Internal,
511                    format!(
512                        "fetching metadata after upsert for partition {} id {}",
513                        offer.partition, offer.id
514                    ),
515                    e,
516                )
517            })?
518            .ok_or_else(|| {
519                OfferStoreError::from_db(
520                    ServiceErrorSource::Internal,
521                    "upsert succeeded but record not found",
522                    sea_orm::DbErr::RecordNotFound(
523                        "Record should exist after successful upsert".to_string(),
524                    ),
525                )
526            })?;
527
528        // Compare timestamps to determine if it was insert (true) or update (false)
529        Ok(result.0 == result.1)
530    }
531
532    async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
533        let result = OfferMetadataTable::delete_by_id((partition.to_string(), *id))
534            .exec(&self.db)
535            .await
536            .map_err(|e| match e {
537                sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
538                    db_err,
539                ))) if db_err.is_foreign_key_violation()
540                    // sqlite
541                    || db_err.code().as_deref() == Some("1811") =>
542                {
543                    OfferStoreError::invalid_input_error(
544                        format!("deleting metadata for partition {partition} id {id}"),
545                        format!("metadata {} is referenced by existing offers", id),
546                    )
547                }
548                _ => OfferStoreError::from_db(
549                    ServiceErrorSource::Internal,
550                    format!("deleting metadata for partition {partition} id {id}"),
551                    e,
552                ),
553            })?;
554
555        Ok(result.rows_affected > 0)
556    }
557}
558
559#[async_trait]
560impl OfferProvider for DbOfferStore {
561    type Error = OfferStoreError;
562
563    async fn offer(
564        &self,
565        _hostname: &str,
566        partition: &str,
567        id: &Uuid,
568    ) -> Result<Option<Offer>, Self::Error> {
569        let result = OfferRecordTable::find_by_id((partition.to_string(), *id))
570            .find_also_related(OfferMetadataTable)
571            .one(&self.db)
572            .await
573            .map_err(|e| {
574                OfferStoreError::from_db(
575                    ServiceErrorSource::Internal,
576                    format!("getting offer with metadata for partition {partition} id {id}",),
577                    e,
578                )
579            })?;
580
581        let (offer_model, metadata_model) = match result {
582            Some((offer, Some(metadata))) => (offer, metadata),
583            _ => return Ok(None),
584        };
585
586        let metadata_sparse: OfferMetadataSparse = serde_json::from_value(metadata_model.metadata)
587            .map_err(|e| {
588                OfferStoreError::serialization_error(
589                    ServiceErrorSource::Internal,
590                    format!("deserializing metadata for offer {id} in partition {partition}",),
591                    e,
592                )
593            })?;
594
595        let lnurl_metadata = LnUrlOfferMetadata(metadata_sparse);
596        let metadata_json_string = serde_json::to_string(&lnurl_metadata).map_err(|e| {
597            OfferStoreError::serialization_error(
598                ServiceErrorSource::Internal,
599                format!("serializing metadata for offer {id} in partition {partition}",),
600                e,
601            )
602        })?;
603
604        let mut hasher = Sha256::new();
605        hasher.update(metadata_json_string.as_bytes());
606        let metadata_json_hash = hasher.finalize().into();
607
608        Ok(Some(Offer {
609            partition: offer_model.partition,
610            id: offer_model.id,
611            max_sendable: offer_model.max_sendable as u64,
612            min_sendable: offer_model.min_sendable as u64,
613            metadata_json_string,
614            metadata_json_hash,
615            timestamp: offer_model.timestamp.with_timezone(&Utc),
616            expires: offer_model.expires.map(|dt| dt.with_timezone(&Utc)),
617        }))
618    }
619}