1use crate::discovery::db::Column;
2use crate::offer::db_orm::prelude::*;
3use crate::offer::db_orm::{offer_metadata_table, offer_record_table};
4use crate::offer::error::OfferStoreError;
5use async_trait::async_trait;
6use chrono::Utc;
7use sea_orm::{
8 ColumnTrait, Database, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
9 Set,
10};
11use sha2::{Digest, Sha256};
12use switchgear_migration::OnConflict;
13use switchgear_migration::{Expr, MigratorTrait};
14use switchgear_service_api::lnurl::LnUrlOfferMetadata;
15use switchgear_service_api::offer::{
16 Offer, OfferMetadata, OfferMetadataSparse, OfferMetadataStore, OfferProvider, OfferRecord,
17 OfferRecordSparse, OfferStore,
18};
19use switchgear_service_api::service::ServiceErrorSource;
20use uuid::Uuid;
21
22#[derive(Clone, Debug)]
23pub struct DbOfferStore {
24 db: DatabaseConnection,
25}
26
27impl DbOfferStore {
28 pub async fn connect(uri: &str, max_connections: u32) -> Result<Self, OfferStoreError> {
29 let mut opt = sea_orm::ConnectOptions::new(uri);
30 opt.max_connections(max_connections);
31 let db = Database::connect(opt).await.map_err(|e| {
32 OfferStoreError::from_db(
33 ServiceErrorSource::Internal,
34 "connecting to offer database",
35 e,
36 )
37 })?;
38
39 Ok(Self::from_db(db))
40 }
41
42 pub async fn migrate_up(&self) -> Result<(), OfferStoreError> {
43 switchgear_migration::OfferMigrator::up(&self.db, None)
44 .await
45 .map_err(|e| {
46 OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database up", e)
47 })?;
48 Ok(())
49 }
50
51 pub async fn migrate_down(&self) -> Result<(), OfferStoreError> {
52 switchgear_migration::OfferMigrator::down(&self.db, None)
53 .await
54 .map_err(|e| {
55 OfferStoreError::from_db(ServiceErrorSource::Internal, "migrating database down", e)
56 })?;
57 Ok(())
58 }
59
60 pub fn from_db(db: DatabaseConnection) -> Self {
61 Self { db }
62 }
63}
64
65#[async_trait]
66impl OfferStore for DbOfferStore {
67 type Error = OfferStoreError;
68
69 async fn get_offer(
70 &self,
71 partition: &str,
72 id: &Uuid,
73 ) -> Result<Option<OfferRecord>, Self::Error> {
74 let model = OfferRecordTable::find_by_id((partition.to_string(), *id))
75 .one(&self.db)
76 .await
77 .map_err(|e| {
78 OfferStoreError::from_db(
79 ServiceErrorSource::Internal,
80 format!("getting offer for partition {partition} id {id}"),
81 e,
82 )
83 })?;
84
85 match model {
86 Some(model) => Ok(Some(OfferRecord {
87 partition: model.partition,
88 id: model.id,
89 offer: OfferRecordSparse {
90 max_sendable: model.max_sendable as u64,
91 min_sendable: model.min_sendable as u64,
92 metadata_id: model.metadata_id,
93 timestamp: model.timestamp.into(),
94 expires: model.expires.map(|dt| dt.into()),
95 },
96 })),
97 None => Ok(None),
98 }
99 }
100
101 async fn get_offers(
102 &self,
103 partition: &str,
104 start: usize,
105 count: usize,
106 ) -> Result<Vec<OfferRecord>, Self::Error> {
107 let models = OfferRecordTable::find()
108 .filter(offer_record_table::Column::Partition.eq(partition))
109 .order_by_asc(offer_record_table::Column::CreatedAt)
110 .order_by_asc(offer_record_table::Column::Id)
111 .offset(start as u64)
112 .limit(count as u64)
113 .all(&self.db)
114 .await
115 .map_err(|e| {
116 OfferStoreError::from_db(
117 ServiceErrorSource::Internal,
118 format!("getting offers for partition {partition}"),
119 e,
120 )
121 })?;
122
123 let mut offers = Vec::new();
124 for model in models {
125 offers.push(OfferRecord {
126 partition: model.partition,
127 id: model.id,
128 offer: OfferRecordSparse {
129 max_sendable: model.max_sendable as u64,
130 min_sendable: model.min_sendable as u64,
131 metadata_id: model.metadata_id,
132 timestamp: model.timestamp.into(),
133 expires: model.expires.map(|dt| dt.into()),
134 },
135 });
136 }
137
138 Ok(offers)
139 }
140
141 async fn post_offer(&self, offer: OfferRecord) -> Result<Option<Uuid>, Self::Error> {
142 let now = Utc::now();
143 let active_model = offer_record_table::ActiveModel {
144 id: Set(offer.id),
145 partition: Set(offer.partition.clone()),
146 max_sendable: Set(offer.offer.max_sendable as i64),
147 min_sendable: Set(offer.offer.min_sendable as i64),
148 metadata_id: Set(offer.offer.metadata_id),
149 timestamp: Set(offer.offer.timestamp.into()),
150 expires: Set(offer.offer.expires.map(|dt| dt.into())),
151 created_at: Set(now.into()),
152 updated_at: Set(now.into()),
153 };
154
155 match OfferRecordTable::insert(active_model).exec(&self.db).await {
156 Ok(_) => Ok(Some(offer.id)),
157 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
159 db_err,
160 )))) if db_err.is_unique_violation() => Ok(None),
161 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
163 db_err,
164 )))) if db_err.is_unique_violation() => Ok(None),
165 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
167 db_err,
168 )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
169 format!("post offer {offer:?}"),
170 format!(
171 "metadata {} not found for offer {}",
172 offer.offer.metadata_id, offer.id
173 ),
174 )),
175 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
176 db_err,
177 )))) if db_err.is_foreign_key_violation() => Err(OfferStoreError::invalid_input_error(
178 format!("post offer {offer:?}"),
179 format!(
180 "metadata {} not found for offer {}",
181 offer.offer.metadata_id, offer.id
182 ),
183 )),
184 Err(e) => Err(OfferStoreError::from_db(
185 ServiceErrorSource::Internal,
186 format!(
187 "inserting offer for partition {} id {}",
188 offer.partition, offer.id
189 ),
190 e,
191 )),
192 }
193 }
194
195 async fn put_offer(&self, offer: OfferRecord) -> Result<bool, Self::Error> {
196 let now = Utc::now();
197 let future_timestamp = now + chrono::Duration::seconds(1);
198
199 let active_model = offer_record_table::ActiveModel {
200 id: Set(offer.id),
201 partition: Set(offer.partition.clone()),
202 max_sendable: Set(offer.offer.max_sendable as i64),
203 min_sendable: Set(offer.offer.min_sendable as i64),
204 metadata_id: Set(offer.offer.metadata_id),
205 timestamp: Set(offer.offer.timestamp.into()),
206 expires: Set(offer.offer.expires.map(|dt| dt.into())),
207 created_at: Set(now.into()), updated_at: Set(now.into()),
209 };
210
211 let _result = match OfferRecordTable::insert(active_model)
212 .on_conflict(
213 OnConflict::columns([
214 offer_record_table::Column::Partition,
215 offer_record_table::Column::Id,
216 ])
217 .update_columns([
218 offer_record_table::Column::MaxSendable,
219 offer_record_table::Column::MinSendable,
220 offer_record_table::Column::MetadataId,
221 offer_record_table::Column::Timestamp,
222 offer_record_table::Column::Expires,
223 ])
224 .value(Column::UpdatedAt, Expr::val(future_timestamp))
225 .to_owned(),
226 )
227 .exec(&self.db)
228 .await
229 {
230 Ok(result) => result,
231 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
233 db_err,
234 )))) if db_err.is_foreign_key_violation() => {
235 return Err(OfferStoreError::invalid_input_error(
236 format!("put offer {offer:?}"),
237 format!(
238 "metadata {} not found for offer {}",
239 offer.offer.metadata_id, offer.id
240 ),
241 ));
242 }
243 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
244 db_err,
245 )))) if db_err.is_foreign_key_violation() => {
246 return Err(OfferStoreError::invalid_input_error(
247 format!("put offer {offer:?}"),
248 format!(
249 "metadata {} not found for offer {}",
250 offer.offer.metadata_id, offer.id
251 ),
252 ));
253 }
254 Err(e) => {
255 return Err(OfferStoreError::from_db(
256 ServiceErrorSource::Internal,
257 format!(
258 "upserting offer for partition {} id {}",
259 offer.partition, offer.id
260 ),
261 e,
262 ));
263 }
264 };
265
266 let result = OfferRecordTable::find()
268 .filter(offer_record_table::Column::Partition.eq(offer.partition.clone()))
269 .filter(offer_record_table::Column::Id.eq(offer.id))
270 .select_only()
271 .column(offer_record_table::Column::CreatedAt)
272 .column(offer_record_table::Column::UpdatedAt)
273 .into_tuple::<(
274 chrono::DateTime<chrono::FixedOffset>,
275 chrono::DateTime<chrono::FixedOffset>,
276 )>()
277 .one(&self.db)
278 .await
279 .map_err(|e| {
280 OfferStoreError::from_db(
281 ServiceErrorSource::Internal,
282 format!(
283 "fetching offer after upsert for partition {} id {}",
284 offer.partition, offer.id
285 ),
286 e,
287 )
288 })?
289 .ok_or_else(|| {
290 OfferStoreError::from_db(
291 ServiceErrorSource::Internal,
292 "upsert succeeded but record not found",
293 sea_orm::DbErr::RecordNotFound(
294 "Record should exist after successful upsert".to_string(),
295 ),
296 )
297 })?;
298
299 Ok(result.0 == result.1)
301 }
302
303 async fn delete_offer(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
304 let result = OfferRecordTable::delete_by_id((partition.to_string(), *id))
305 .exec(&self.db)
306 .await
307 .map_err(|e| {
308 OfferStoreError::from_db(
309 ServiceErrorSource::Internal,
310 format!("deleting offer for partition {partition} id {id}"),
311 e,
312 )
313 })?;
314
315 Ok(result.rows_affected > 0)
316 }
317}
318
319#[async_trait]
320impl OfferMetadataStore for DbOfferStore {
321 type Error = OfferStoreError;
322
323 async fn get_metadata(
324 &self,
325 partition: &str,
326 id: &Uuid,
327 ) -> Result<Option<OfferMetadata>, Self::Error> {
328 let model = OfferMetadataTable::find_by_id((partition.to_string(), *id))
329 .one(&self.db)
330 .await
331 .map_err(|e| {
332 OfferStoreError::from_db(
333 ServiceErrorSource::Internal,
334 format!("getting metadata for partition {partition} id {id}"),
335 e,
336 )
337 })?;
338
339 match model {
340 Some(model) => {
341 let metadata = serde_json::from_value(model.metadata).map_err(|e| {
342 OfferStoreError::serialization_error(
343 ServiceErrorSource::Internal,
344 format!("deserializing metadata for partition {partition} id {id}",),
345 e,
346 )
347 })?;
348
349 Ok(Some(OfferMetadata {
350 id: model.id,
351 partition: model.partition,
352 metadata,
353 }))
354 }
355 None => Ok(None),
356 }
357 }
358
359 async fn get_all_metadata(
360 &self,
361 partition: &str,
362 start: usize,
363 count: usize,
364 ) -> Result<Vec<OfferMetadata>, Self::Error> {
365 let models = OfferMetadataTable::find()
366 .filter(offer_metadata_table::Column::Partition.eq(partition))
367 .order_by_asc(offer_metadata_table::Column::CreatedAt)
368 .order_by_asc(offer_metadata_table::Column::Id)
369 .offset(start as u64)
370 .limit(count as u64)
371 .all(&self.db)
372 .await
373 .map_err(|e| {
374 OfferStoreError::from_db(
375 ServiceErrorSource::Internal,
376 format!("getting all metadata for partition {partition}"),
377 e,
378 )
379 })?;
380
381 let mut metadata_list = Vec::new();
382 for model in models {
383 let metadata = serde_json::from_value(model.metadata).map_err(|e| {
384 OfferStoreError::serialization_error(
385 ServiceErrorSource::Internal,
386 format!(
387 "deserializing metadata for partition {} id {}",
388 partition, model.id
389 ),
390 e,
391 )
392 })?;
393
394 metadata_list.push(OfferMetadata {
395 id: model.id,
396 partition: model.partition,
397 metadata,
398 });
399 }
400
401 Ok(metadata_list)
402 }
403
404 async fn post_metadata(&self, offer: OfferMetadata) -> Result<Option<Uuid>, Self::Error> {
405 let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
406 OfferStoreError::serialization_error(
407 ServiceErrorSource::Internal,
408 format!(
409 "serializing metadata for partition {} id {}",
410 offer.partition, offer.id
411 ),
412 e,
413 )
414 })?;
415
416 let now = Utc::now();
417 let active_model = offer_metadata_table::ActiveModel {
418 id: Set(offer.id),
419 partition: Set(offer.partition.clone()),
420 metadata: Set(metadata_json),
421 created_at: Set(now.into()),
422 updated_at: Set(now.into()),
423 };
424
425 match OfferMetadataTable::insert(active_model)
426 .exec(&self.db)
427 .await
428 {
429 Ok(_) => Ok(Some(offer.id)),
430 Err(sea_orm::DbErr::Query(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
432 db_err,
433 )))) if db_err.is_unique_violation() => Ok(None),
434 Err(sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
436 db_err,
437 )))) if db_err.is_unique_violation() => Ok(None),
438 Err(e) => Err(OfferStoreError::from_db(
439 ServiceErrorSource::Internal,
440 format!(
441 "inserting metadata for partition {} id {}",
442 offer.partition, offer.id
443 ),
444 e,
445 )),
446 }
447 }
448
449 async fn put_metadata(&self, offer: OfferMetadata) -> Result<bool, Self::Error> {
450 let metadata_json = serde_json::to_value(&offer.metadata).map_err(|e| {
451 OfferStoreError::serialization_error(
452 ServiceErrorSource::Internal,
453 format!(
454 "serializing metadata for partition {} id {}",
455 offer.partition, offer.id
456 ),
457 e,
458 )
459 })?;
460
461 let now = Utc::now();
462 let future_timestamp = now + chrono::Duration::seconds(1);
463
464 let active_model = offer_metadata_table::ActiveModel {
465 id: Set(offer.id),
466 partition: Set(offer.partition.clone()),
467 metadata: Set(metadata_json),
468 created_at: Set(now.into()), updated_at: Set(now.into()),
470 };
471
472 let _result = OfferMetadataTable::insert(active_model)
473 .on_conflict(
474 OnConflict::columns([
475 offer_metadata_table::Column::Partition,
476 offer_metadata_table::Column::Id,
477 ])
478 .update_columns([offer_metadata_table::Column::Metadata])
479 .value(Column::UpdatedAt, Expr::val(future_timestamp))
480 .to_owned(),
481 )
482 .exec(&self.db)
483 .await
484 .map_err(|e| {
485 OfferStoreError::from_db(
486 ServiceErrorSource::Internal,
487 format!(
488 "upserting metadata for partition {} id {}",
489 offer.partition, offer.id
490 ),
491 e,
492 )
493 })?;
494
495 let result = OfferMetadataTable::find()
497 .filter(offer_metadata_table::Column::Partition.eq(offer.partition.clone()))
498 .filter(offer_metadata_table::Column::Id.eq(offer.id))
499 .select_only()
500 .column(offer_metadata_table::Column::CreatedAt)
501 .column(offer_metadata_table::Column::UpdatedAt)
502 .into_tuple::<(
503 chrono::DateTime<chrono::FixedOffset>,
504 chrono::DateTime<chrono::FixedOffset>,
505 )>()
506 .one(&self.db)
507 .await
508 .map_err(|e| {
509 OfferStoreError::from_db(
510 ServiceErrorSource::Internal,
511 format!(
512 "fetching metadata after upsert for partition {} id {}",
513 offer.partition, offer.id
514 ),
515 e,
516 )
517 })?
518 .ok_or_else(|| {
519 OfferStoreError::from_db(
520 ServiceErrorSource::Internal,
521 "upsert succeeded but record not found",
522 sea_orm::DbErr::RecordNotFound(
523 "Record should exist after successful upsert".to_string(),
524 ),
525 )
526 })?;
527
528 Ok(result.0 == result.1)
530 }
531
532 async fn delete_metadata(&self, partition: &str, id: &Uuid) -> Result<bool, Self::Error> {
533 let result = OfferMetadataTable::delete_by_id((partition.to_string(), *id))
534 .exec(&self.db)
535 .await
536 .map_err(|e| match e {
537 sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx::Error::Database(
538 db_err,
539 ))) if db_err.is_foreign_key_violation()
540 || db_err.code().as_deref() == Some("1811") =>
542 {
543 OfferStoreError::invalid_input_error(
544 format!("deleting metadata for partition {partition} id {id}"),
545 format!("metadata {} is referenced by existing offers", id),
546 )
547 }
548 _ => OfferStoreError::from_db(
549 ServiceErrorSource::Internal,
550 format!("deleting metadata for partition {partition} id {id}"),
551 e,
552 ),
553 })?;
554
555 Ok(result.rows_affected > 0)
556 }
557}
558
559#[async_trait]
560impl OfferProvider for DbOfferStore {
561 type Error = OfferStoreError;
562
563 async fn offer(
564 &self,
565 _hostname: &str,
566 partition: &str,
567 id: &Uuid,
568 ) -> Result<Option<Offer>, Self::Error> {
569 let result = OfferRecordTable::find_by_id((partition.to_string(), *id))
570 .find_also_related(OfferMetadataTable)
571 .one(&self.db)
572 .await
573 .map_err(|e| {
574 OfferStoreError::from_db(
575 ServiceErrorSource::Internal,
576 format!("getting offer with metadata for partition {partition} id {id}",),
577 e,
578 )
579 })?;
580
581 let (offer_model, metadata_model) = match result {
582 Some((offer, Some(metadata))) => (offer, metadata),
583 _ => return Ok(None),
584 };
585
586 let metadata_sparse: OfferMetadataSparse = serde_json::from_value(metadata_model.metadata)
587 .map_err(|e| {
588 OfferStoreError::serialization_error(
589 ServiceErrorSource::Internal,
590 format!("deserializing metadata for offer {id} in partition {partition}",),
591 e,
592 )
593 })?;
594
595 let lnurl_metadata = LnUrlOfferMetadata(metadata_sparse);
596 let metadata_json_string = serde_json::to_string(&lnurl_metadata).map_err(|e| {
597 OfferStoreError::serialization_error(
598 ServiceErrorSource::Internal,
599 format!("serializing metadata for offer {id} in partition {partition}",),
600 e,
601 )
602 })?;
603
604 let mut hasher = Sha256::new();
605 hasher.update(metadata_json_string.as_bytes());
606 let metadata_json_hash = hasher.finalize().into();
607
608 Ok(Some(Offer {
609 partition: offer_model.partition,
610 id: offer_model.id,
611 max_sendable: offer_model.max_sendable as u64,
612 min_sendable: offer_model.min_sendable as u64,
613 metadata_json_string,
614 metadata_json_hash,
615 timestamp: offer_model.timestamp.with_timezone(&Utc),
616 expires: offer_model.expires.map(|dt| dt.with_timezone(&Utc)),
617 }))
618 }
619}