1use crate::api::offer::{
2 Offer, OfferMetadata, OfferMetadataStore, OfferProvider, OfferRecord, OfferStore,
3};
4use crate::api::service::ServiceErrorSource;
5use crate::components::discovery::db::Column;
6use crate::components::offer::db_orm::prelude::*;
7use crate::components::offer::db_orm::{offer_metadata_table, offer_record_table};
8use crate::components::offer::error::OfferStoreError;
9use async_trait::async_trait;
10use chrono::Utc;
11use sea_orm::{
12 ColumnTrait, Database, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
13 Set,
14};
15use sha2::{Digest, Sha256};
16use switchgear_migration::OnConflict;
17use switchgear_migration::{Expr, MigratorTrait};
18use uuid::Uuid;
19
20#[derive(Clone, Debug)]
21pub struct DbOfferStore {
22 db: DatabaseConnection,
23}
24
25impl DbOfferStore {
26 pub async fn connect(url: &str, max_connections: u32) -> Result<Self, OfferStoreError> {
27 let mut opt = sea_orm::ConnectOptions::new(url);
28 opt.max_connections(max_connections);
29 let db = Database::connect(opt.clone()).await.map_err(|e| {
30 OfferStoreError::from_db(
31 ServiceErrorSource::Internal,
32 format!("connecting to database with {opt:?}",),
33 e,
34 )
35 })?;
36
37 Ok(Self::from_db(db))
38 }
39
40 pub async fn migrate_up(&self) -> Result<(), OfferStoreError> {
41 switchgear_migration::OfferMigrator::up(&self.db, None)
42 .await
43 .map_err(|e| {
44 OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database up", e)
45 })?;
46 Ok(())
47 }
48
49 pub async fn migrate_down(&self) -> Result<(), OfferStoreError> {
50 switchgear_migration::OfferMigrator::down(&self.db, None)
51 .await
52 .map_err(|e| {
53 OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database down", e)
54 })?;
55 Ok(())
56 }
57
58 pub fn from_db(db: DatabaseConnection) -> Self {
59 Self { db }
60 }
61}
62
63#[async_trait]
64impl OfferStore for DbOfferStore {
65 type Error = OfferStoreError;
66
67 async fn get_offer(
68 &self,
69 partition: &str,
70 id: &Uuid,
71 ) -> Result<Option<OfferRecord>, Self::Error> {
72 let model = OfferRecordTable::find_by_id((partition.to_string(), *id))
73 .one(&self.db)
74 .await
75 .map_err(|e| {
76 OfferStoreError::from_db(
77 ServiceErrorSource::Internal,
78 format!("getting offer for partition {partition} id {id}"),
79 e,
80 )
81 })?;
82
83 match model {
84 Some(model) => Ok(Some(OfferRecord {
85 partition: model.partition,
86 id: model.id,
87 offer: crate::api::offer::OfferRecordSparse {
88 max_sendable: model.max_sendable as u64,
89 min_sendable: model.min_sendable as u64,
90 metadata_id: model.metadata_id,
91 timestamp: model.timestamp.into(),
92 expires: model.expires.map(|dt| dt.into()),
93 },
94 })),
95 None => Ok(None),
96 }
97 }
98
99 async fn get_offers(
100 &self,
101 partition: &str,
102 start: usize,
103 count: usize,
104 ) -> Result<Vec<OfferRecord>, Self::Error> {
105 let models = OfferRecordTable::find()
106 .filter(offer_record_table::Column::Partition.eq(partition))
107 .order_by_asc(offer_record_table::Column::CreatedAt)
108 .order_by_asc(offer_record_table::Column::Id)
109 .offset(start as u64)
110 .limit(count as u64)
111 .all(&self.db)
112 .await
113 .map_err(|e| {
114 OfferStoreError::from_db(
115 ServiceErrorSource::Internal,
116 format!("getting offers for partition {partition}"),
117 e,
118 )
119 })?;
120
121 let mut offers = Vec::new();
122 for model in models {
123 offers.push(OfferRecord {
124 partition: model.partition,
125 id: model.id,
126 offer: crate::api::offer::OfferRecordSparse {
127 max_sendable: model.max_sendable as u64,
128 min_sendable: model.min_sendable as u64,
129 metadata_id: model.metadata_id,
130 timestamp: model.timestamp.into(),
131 expires: model.expires.map(|dt| dt.into()),
132 },
133 });
134 }
135
136 Ok(offers)
137 }
138
139 async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
140 let now = Utc::now();
141 let active_model = offer_record_table::ActiveModel {
142 id: Set(offer.id),
143 partition: Set(offer.partition.clone()),
144 max_sendable: Set(offer.offer.max_sendable as i64),
145 min_sendable: Set(offer.offer.min_sendable as i64),
146 metadata_id: Set(offer.offer.metadata_id),
147 timestamp: Set(offer.offer.timestamp.into()),
148 expires: Set(offer.offer.expires.map(|dt| dt.into())),
149 created_at: Set(now.into()),
150 updated_at: Set(now.into()),
151 };
152
153 match OfferRecordTable::insert(active_model).exec(&self.db).await {
154 Ok(_) => Ok(Some(offer.id)),
155 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
157 db_err,
158 )))) if db_err.is_unique_violation() => Ok(None),
159 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
161 db_err,
162 )))) if db_err.is_unique_violation() => Ok(None),
163 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
165 db_err,
166 )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
167 format!("post offer {offer:?}"),
168 format!(
169 "metadata {} not found for offer {}",
170 offer.offer.metadata_id, offer.id
171 ),
172 )),
173 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
174 db_err,
175 )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
176 format!("post offer {offer:?}"),
177 format!(
178 "metadata {} not found for offer {}",
179 offer.offer.metadata_id, offer.id
180 ),
181 )),
182 Err(e) => Err(OfferStoreError::from_db(
183 ServiceErrorSource::Internal,
184 format!(
185 "inserting offer for partition {} id {}",
186 offer.partition, offer.id
187 ),
188 e,
189 )),
190 }
191 }
192
193 async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
194 let now = Utc::now();
195 let future_timestamp = now + chrono::Duration::seconds(1);
196
197 let active_model = offer_record_table::ActiveModel {
198 id: Set(offer.id),
199 partition: Set(offer.partition.clone()),
200 max_sendable: Set(offer.offer.max_sendable as i64),
201 min_sendable: Set(offer.offer.min_sendable as i64),
202 metadata_id: Set(offer.offer.metadata_id),
203 timestamp: Set(offer.offer.timestamp.into()),
204 expires: Set(offer.offer.expires.map(|dt| dt.into())),
205 created_at: Set(now.into()), updated_at: Set(now.into()),
207 };
208
209 let _result = match OfferRecordTable::insert(active_model)
210 .on_conflict(
211 OnConflict::columns([
212 offer_record_table::Column::Partition,
213 offer_record_table::Column::Id,
214 ])
215 .update_columns([
216 offer_record_table::Column::MaxSendable,
217 offer_record_table::Column::MinSendable,
218 offer_record_table::Column::MetadataId,
219 offer_record_table::Column::Timestamp,
220 offer_record_table::Column::Expires,
221 ])
222 .value(Column::UpdatedAt, Expr::val(future_timestamp))
223 .to_owned(),
224 )
225 .exec(&self.db)
226 .await
227 {
228 Ok(result) => result,
229 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
231 db_err,
232 )))) if db_err.is_foreign_key_violation() => {
233 return Err(OfferStoreError::invalid_input_error(
234 format!("put offer {offer:?}"),
235 format!(
236 "metadata {} not found for offer {}",
237 offer.offer.metadata_id, offer.id
238 ),
239 ));
240 }
241 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
242 db_err,
243 )))) if db_err.is_foreign_key_violation() => {
244 return Err(OfferStoreError::invalid_input_error(
245 format!("put offer {offer:?}"),
246 format!(
247 "metadata {} not found for offer {}",
248 offer.offer.metadata_id, offer.id
249 ),
250 ));
251 }
252 Err(e) => {
253 return Err(OfferStoreError::from_db(
254 ServiceErrorSource::Internal,
255 format!(
256 "upserting offer for partition {} id {}",
257 offer.partition, offer.id
258 ),
259 e,
260 ));
261 }
262 };
263
264 let result = OfferRecordTable::find()
266 .filter(offer_record_table::Column::Partition.eq(offer.partition.clone()))
267 .filter(offer_record_table::Column::Id.eq(offer.id))
268 .select_only()
269 .column(offer_record_table::Column::CreatedAt)
270 .column(offer_record_table::Column::UpdatedAt)
271 .into_tuple::<(
272 chrono::DateTime<chrono::FixedOffset>,
273 chrono::DateTime<chrono::FixedOffset>,
274 )>()
275 .one(&self.db)
276 .await
277 .map_err(|e| {
278 OfferStoreError::from_db(
279 ServiceErrorSource::Internal,
280 format!(
281 "fetching offer after upsert for partition {} id {}",
282 offer.partition, offer.id
283 ),
284 e,
285 )
286 })?
287 .ok_or_else(|| {
288 OfferStoreError::from_db(
289 ServiceErrorSource::Internal,
290 "upsert succeeded but record not found",
291 sea_orm::DbErr::RecordNotFound(
292 "Record should exist after successful upsert".to_string(),
293 ),
294 )
295 })?;
296
297 Ok(result.0 == result.1)
299 }
300
301 async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
302 let result = OfferRecordTable::delete_by_id((partition.to_string(), *id))
303 .exec(&self.db)
304 .await
305 .map_err(|e| {
306 OfferStoreError::from_db(
307 ServiceErrorSource::Internal,
308 format!("deleting offer for partition {partition} id {id}"),
309 e,
310 )
311 })?;
312
313 Ok(result.rows_affected > 0)
314 }
315}
316
317#[async_trait]
318impl OfferMetadataStore for DbOfferStore {
319 type Error = OfferStoreError;
320
321 async fn get_metadata(
322 &self,
323 partition: &str,
324 id: &Uuid,
325 ) -> Result<Option<OfferMetadata>, Self::Error> {
326 let model = OfferMetadataTable::find_by_id((partition.to_string(), *id))
327 .one(&self.db)
328 .await
329 .map_err(|e| {
330 OfferStoreError::from_db(
331 ServiceErrorSource::Internal,
332 format!("getting metadata for partition {partition} id {id}"),
333 e,
334 )
335 })?;
336
337 match model {
338 Some(model) => {
339 let metadata = serde_json::from_value(model.metadata).map_err(|e| {
340 OfferStoreError::serialization_error(
341 ServiceErrorSource::Internal,
342 format!("deserializing metadata for partition {partition} id {id}",),
343 e,
344 )
345 })?;
346
347 Ok(Some(OfferMetadata {
348 id: model.id,
349 partition: model.partition,
350 metadata,
351 }))
352 }
353 None => Ok(None),
354 }
355 }
356
357 async fn get_all_metadata(
358 &self,
359 partition: &str,
360 start: usize,
361 count: usize,
362 ) -> Result<Vec<OfferMetadata>, Self::Error> {
363 let models = OfferMetadataTable::find()
364 .filter(offer_metadata_table::Column::Partition.eq(partition))
365 .order_by_asc(offer_metadata_table::Column::CreatedAt)
366 .order_by_asc(offer_metadata_table::Column::Id)
367 .offset(start as u64)
368 .limit(count as u64)
369 .all(&self.db)
370 .await
371 .map_err(|e| {
372 OfferStoreError::from_db(
373 ServiceErrorSource::Internal,
374 format!("getting all metadata for partition {partition}"),
375 e,
376 )
377 })?;
378
379 let mut metadata_list = Vec::new();
380 for model in models {
381 let metadata = serde_json::from_value(model.metadata).map_err(|e| {
382 OfferStoreError::serialization_error(
383 ServiceErrorSource::Internal,
384 format!(
385 "deserializing metadata for partition {} id {}",
386 partition, model.id
387 ),
388 e,
389 )
390 })?;
391
392 metadata_list.push(OfferMetadata {
393 id: model.id,
394 partition: model.partition,
395 metadata,
396 });
397 }
398
399 Ok(metadata_list)
400 }
401
402 async fn post_metadata(&self, offer: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
403 let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
404 OfferStoreError::serialization_error(
405 ServiceErrorSource::Internal,
406 format!(
407 "serializing metadata for partition {} id {}",
408 offer.partition, offer.id
409 ),
410 e,
411 )
412 })?;
413
414 let now = Utc::now();
415 let active_model = offer_metadata_table::ActiveModel {
416 id: Set(offer.id),
417 partition: Set(offer.partition.clone()),
418 metadata: Set(metadata_json),
419 created_at: Set(now.into()),
420 updated_at: Set(now.into()),
421 };
422
423 match OfferMetadataTable::insert(active_model)
424 .exec(&self.db)
425 .await
426 {
427 Ok(_) => Ok(Some(offer.id)),
428 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
430 db_err,
431 )))) if db_err.is_unique_violation() => Ok(None),
432 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
434 db_err,
435 )))) if db_err.is_unique_violation() => Ok(None),
436 Err(e) => Err(OfferStoreError::from_db(
437 ServiceErrorSource::Internal,
438 format!(
439 "inserting metadata for partition {} id {}",
440 offer.partition, offer.id
441 ),
442 e,
443 )),
444 }
445 }
446
447 async fn put_metadata(&self, offer: OfferMetadata) -> Result<bool, Self::Error> {
448 let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
449 OfferStoreError::serialization_error(
450 ServiceErrorSource::Internal,
451 format!(
452 "serializing metadata for partition {} id {}",
453 offer.partition, offer.id
454 ),
455 e,
456 )
457 })?;
458
459 let now = Utc::now();
460 let future_timestamp = now + chrono::Duration::seconds(1);
461
462 let active_model = offer_metadata_table::ActiveModel {
463 id: Set(offer.id),
464 partition: Set(offer.partition.clone()),
465 metadata: Set(metadata_json),
466 created_at: Set(now.into()), updated_at: Set(now.into()),
468 };
469
470 let _result = OfferMetadataTable::insert(active_model)
471 .on_conflict(
472 OnConflict::columns([
473 offer_metadata_table::Column::Partition,
474 offer_metadata_table::Column::Id,
475 ])
476 .update_columns([offer_metadata_table::Column::Metadata])
477 .value(Column::UpdatedAt, Expr::val(future_timestamp))
478 .to_owned(),
479 )
480 .exec(&self.db)
481 .await
482 .map_err(|e| {
483 OfferStoreError::from_db(
484 ServiceErrorSource::Internal,
485 format!(
486 "upserting metadata for partition {} id {}",
487 offer.partition, offer.id
488 ),
489 e,
490 )
491 })?;
492
493 let result = OfferMetadataTable::find()
495 .filter(offer_metadata_table::Column::Partition.eq(offer.partition.clone()))
496 .filter(offer_metadata_table::Column::Id.eq(offer.id))
497 .select_only()
498 .column(offer_metadata_table::Column::CreatedAt)
499 .column(offer_metadata_table::Column::UpdatedAt)
500 .into_tuple::<(
501 chrono::DateTime<chrono::FixedOffset>,
502 chrono::DateTime<chrono::FixedOffset>,
503 )>()
504 .one(&self.db)
505 .await
506 .map_err(|e| {
507 OfferStoreError::from_db(
508 ServiceErrorSource::Internal,
509 format!(
510 "fetching metadata after upsert for partition {} id {}",
511 offer.partition, offer.id
512 ),
513 e,
514 )
515 })?
516 .ok_or_else(|| {
517 OfferStoreError::from_db(
518 ServiceErrorSource::Internal,
519 "upsert succeeded but record not found",
520 sea_orm::DbErr::RecordNotFound(
521 "Record should exist after successful upsert".to_string(),
522 ),
523 )
524 })?;
525
526 Ok(result.0 == result.1)
528 }
529
530 async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
531 let result = OfferMetadataTable::delete_by_id((partition.to_string(), *id))
532 .exec(&self.db)
533 .await
534 .map_err(|e| match e {
535 sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
536 db_err,
537 ))) if db_err.is_foreign_key_violation()
538 || db_err.code().as_deref() == Some("1811") =>
540 {
541 OfferStoreError::invalid_input_error(
542 format!("deleting metadata for partition {partition} id {id}"),
543 format!("metadata {} is referenced by existing offers", id),
544 )
545 }
546 _ => OfferStoreError::from_db(
547 ServiceErrorSource::Internal,
548 format!("deleting metadata for partition {partition} id {id}"),
549 e,
550 ),
551 })?;
552
553 Ok(result.rows_affected > 0)
554 }
555}
556
557#[async_trait]
558impl OfferProvider for DbOfferStore {
559 type Error = OfferStoreError;
560
561 async fn offer(
562 &self,
563 _hostname: &str,
564 partition: &str,
565 id: &Uuid,
566 ) -> Result<Option<Offer>, Self::Error> {
567 let result = OfferRecordTable::find_by_id((partition.to_string(), *id))
568 .find_also_related(OfferMetadataTable)
569 .one(&self.db)
570 .await
571 .map_err(|e| {
572 OfferStoreError::from_db(
573 ServiceErrorSource::Internal,
574 format!("getting offer with metadata for partition {partition} id {id}",),
575 e,
576 )
577 })?;
578
579 let (offer_model, metadata_model) = match result {
580 Some((offer, Some(metadata))) => (offer, metadata),
581 _ => return Ok(None),
582 };
583
584 use crate::api::offer::OfferMetadataSparse;
585 let metadata_sparse: OfferMetadataSparse = serde_json::from_value(metadata_model.metadata)
586 .map_err(|e| {
587 OfferStoreError::serialization_error(
588 ServiceErrorSource::Internal,
589 format!("deserializing metadata for offer {id} in partition {partition}",),
590 e,
591 )
592 })?;
593
594 use crate::api::lnurl::LnUrlOfferMetadata;
595 let lnurl_metadata = LnUrlOfferMetadata(metadata_sparse);
596 let metadata_json_string = serde_json::to_string(&lnurl_metadata).map_err(|e| {
597 OfferStoreError::serialization_error(
598 ServiceErrorSource::Internal,
599 format!("serializing metadata for offer {id} in partition {partition}",),
600 e,
601 )
602 })?;
603
604 let mut hasher = Sha256::new();
605 hasher.update(metadata_json_string.as_bytes());
606 let metadata_json_hash = hasher.finalize().into();
607
608 Ok(Some(Offer {
609 partition: offer_model.partition,
610 id: offer_model.id,
611 max_sendable: offer_model.max_sendable as u64,
612 min_sendable: offer_model.min_sendable as u64,
613 metadata_json_string,
614 metadata_json_hash,
615 timestamp: offer_model.timestamp.with_timezone(&Utc),
616 expires: offer_model.expires.map(|dt| dt.with_timezone(&Utc)),
617 }))
618 }
619}