1use async_trait::async_trait;
5use bandwidth::BandwidthManager;
6use clients::{ClientManager, ClientType};
7use models::{
8 Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
9 VerifiedTicket, WireguardPeer,
10};
11use nym_credentials_interface::ClientTicket;
12use nym_gateway_requests::shared_key::SharedSymmetricKey;
13use nym_sphinx::DestinationAddressBytes;
14use shared_keys::SharedKeysManager;
15use sqlx::{
16 ConnectOptions,
17 sqlite::{SqliteAutoVacuum, SqliteSynchronous},
18};
19use std::{path::Path, time::Duration};
20use tickets::TicketStorageManager;
21use time::OffsetDateTime;
22use tracing::{debug, error, log::LevelFilter};
23
24pub mod bandwidth;
25mod clients;
26pub mod error;
27mod inboxes;
28pub mod models;
29mod shared_keys;
30mod tickets;
31pub mod traits;
32mod wireguard_peers;
33
34pub use error::GatewayStorageError;
35pub use inboxes::InboxManager;
36
37use crate::traits::{BandwidthGatewayStorage, InboxGatewayStorage, SharedKeyGatewayStorage};
38
39fn make_bincode_serializer() -> impl bincode::Options {
40 use bincode::Options;
41 bincode::DefaultOptions::new()
42 .with_big_endian()
43 .with_varint_encoding()
44}
45
46#[derive(Clone)]
48pub struct GatewayStorage {
49 client_manager: ClientManager,
50 shared_key_manager: SharedKeysManager,
51 inbox_manager: InboxManager,
52 bandwidth_manager: BandwidthManager,
53 ticket_manager: TicketStorageManager,
54 wireguard_peer_manager: wireguard_peers::WgPeerManager,
55}
56
57impl GatewayStorage {
58 #[allow(dead_code)]
59 pub(crate) fn client_manager(&self) -> &ClientManager {
60 &self.client_manager
61 }
62
63 pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
64 &self.shared_key_manager
65 }
66
67 pub fn inbox_manager(&self) -> &InboxManager {
68 &self.inbox_manager
69 }
70
71 pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
72 &self.bandwidth_manager
73 }
74
75 #[allow(dead_code)]
76 pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
77 &self.ticket_manager
78 }
79
80 #[allow(dead_code)]
81 pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
82 &self.wireguard_peer_manager
83 }
84
85 pub async fn handle_forget_me(
86 &self,
87 client_address: DestinationAddressBytes,
88 ) -> Result<(), GatewayStorageError> {
89 let client_id = self.get_mixnet_client_id(client_address).await?;
90 self.inbox_manager()
91 .remove_messages_for_client(&client_address.as_base58_string())
92 .await?;
93 self.bandwidth_manager().remove_client(client_id).await?;
94 self.shared_key_manager()
95 .remove_shared_keys(&client_address.as_base58_string())
96 .await?;
97 Ok(())
98 }
99
100 pub async fn init<P: AsRef<Path> + Send>(
107 database_path: P,
108 message_retrieval_limit: i64,
109 ) -> Result<Self, GatewayStorageError> {
110 debug!(
111 "Attempting to connect to database {}",
112 database_path.as_ref().display()
113 );
114
115 let opts = sqlx::sqlite::SqliteConnectOptions::new()
118 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
119 .synchronous(SqliteSynchronous::Normal)
120 .auto_vacuum(SqliteAutoVacuum::Incremental)
121 .log_slow_statements(LevelFilter::Warn, Duration::from_millis(250))
122 .filename(database_path)
123 .create_if_missing(true)
124 .disable_statement_logging();
125
126 let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
129 Ok(db) => db,
130 Err(err) => {
131 error!("Failed to connect to SQLx database: {err}");
132 return Err(err.into());
133 }
134 };
135
136 Self::from_connection_pool(connection_pool, message_retrieval_limit).await
137 }
138
139 pub async fn from_connection_pool(
140 connection_pool: sqlx::sqlite::SqlitePool,
141 message_retrieval_limit: i64,
142 ) -> Result<Self, GatewayStorageError> {
143 if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
144 error!("Failed to perform migration on the SQLx database: {err}");
145 return Err(err.into());
146 }
147
148 Ok(GatewayStorage {
150 client_manager: clients::ClientManager::new(connection_pool.clone()),
151 wireguard_peer_manager: wireguard_peers::WgPeerManager::new(connection_pool.clone()),
152 shared_key_manager: SharedKeysManager::new(connection_pool.clone()),
153 inbox_manager: InboxManager::new(connection_pool.clone(), message_retrieval_limit),
154 bandwidth_manager: BandwidthManager::new(connection_pool.clone()),
155 ticket_manager: TicketStorageManager::new(connection_pool),
156 })
157 }
158}
159
160#[async_trait]
161impl SharedKeyGatewayStorage for GatewayStorage {
162 async fn get_mixnet_client_id(
163 &self,
164 client_address: DestinationAddressBytes,
165 ) -> Result<i64, GatewayStorageError> {
166 Ok(self
167 .shared_key_manager
168 .client_id(&client_address.as_base58_string())
169 .await?)
170 }
171
172 async fn insert_shared_keys(
173 &self,
174 client_address: DestinationAddressBytes,
175 shared_keys: &SharedSymmetricKey,
176 ) -> Result<i64, GatewayStorageError> {
177 let client_address_bs58 = client_address.as_base58_string();
178 let client_id = match self
179 .shared_key_manager
180 .client_id(&client_address_bs58)
181 .await
182 {
183 Ok(client_id) => client_id,
184 _ => {
185 self.client_manager
186 .insert_client(ClientType::EntryMixnet)
187 .await?
188 }
189 };
190 self.shared_key_manager
191 .insert_shared_keys(
192 client_id,
193 client_address_bs58,
194 shared_keys.to_bytes().as_ref(),
195 )
196 .await?;
197 Ok(client_id)
198 }
199
200 async fn get_shared_keys(
201 &self,
202 client_address: DestinationAddressBytes,
203 ) -> Result<Option<PersistedSharedKeys>, GatewayStorageError> {
204 let keys = self
205 .shared_key_manager
206 .get_shared_keys(&client_address.as_base58_string())
207 .await?;
208 Ok(keys)
209 }
210
211 #[allow(dead_code)]
212 async fn remove_shared_keys(
213 &self,
214 client_address: DestinationAddressBytes,
215 ) -> Result<(), GatewayStorageError> {
216 self.shared_key_manager
217 .remove_shared_keys(&client_address.as_base58_string())
218 .await?;
219 Ok(())
220 }
221
222 async fn update_last_used_authentication_timestamp(
223 &self,
224 client_id: i64,
225 last_used_authentication_timestamp: OffsetDateTime,
226 ) -> Result<(), GatewayStorageError> {
227 self.shared_key_manager
228 .update_last_used_authentication_timestamp(
229 client_id,
230 last_used_authentication_timestamp,
231 )
232 .await?;
233 Ok(())
234 }
235
236 async fn get_client(&self, client_id: i64) -> Result<Option<Client>, GatewayStorageError> {
237 let client = self.client_manager.get_client(client_id).await?;
238 Ok(client)
239 }
240}
241
242#[async_trait]
243impl InboxGatewayStorage for GatewayStorage {
244 async fn store_message(
245 &self,
246 client_address: DestinationAddressBytes,
247 message: Vec<u8>,
248 ) -> Result<(), GatewayStorageError> {
249 self.inbox_manager
250 .insert_message(&client_address.as_base58_string(), message)
251 .await?;
252 Ok(())
253 }
254
255 async fn retrieve_messages(
256 &self,
257 client_address: DestinationAddressBytes,
258 start_after: Option<i64>,
259 ) -> Result<(Vec<StoredMessage>, Option<i64>), GatewayStorageError> {
260 let messages = self
261 .inbox_manager
262 .get_messages(&client_address.as_base58_string(), start_after)
263 .await?;
264 Ok(messages)
265 }
266
267 async fn remove_messages(&self, ids: Vec<i64>) -> Result<(), GatewayStorageError> {
268 for id in ids {
269 self.inbox_manager.remove_message(id).await?;
270 }
271 Ok(())
272 }
273}
274
275#[async_trait]
276impl BandwidthGatewayStorage for GatewayStorage {
277 async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> {
278 self.bandwidth_manager.insert_new_client(client_id).await?;
279 Ok(())
280 }
281
282 async fn set_expiration(
283 &self,
284 client_id: i64,
285 expiration: OffsetDateTime,
286 ) -> Result<(), GatewayStorageError> {
287 self.bandwidth_manager
288 .set_expiration(client_id, expiration)
289 .await?;
290 Ok(())
291 }
292
293 async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> {
294 self.bandwidth_manager.reset_bandwidth(client_id).await?;
295 Ok(())
296 }
297
298 async fn get_available_bandwidth(
299 &self,
300 client_id: i64,
301 ) -> Result<Option<PersistedBandwidth>, GatewayStorageError> {
302 Ok(self
303 .bandwidth_manager
304 .get_available_bandwidth(client_id)
305 .await?)
306 }
307
308 async fn increase_bandwidth(
309 &self,
310 client_id: i64,
311 amount: i64,
312 ) -> Result<i64, GatewayStorageError> {
313 Ok(self
314 .bandwidth_manager
315 .increase_bandwidth(client_id, amount)
316 .await?)
317 }
318
319 async fn revoke_ticket_bandwidth(
320 &self,
321 ticket_id: i64,
322 amount: i64,
323 ) -> Result<(), GatewayStorageError> {
324 Ok(self
325 .bandwidth_manager
326 .revoke_ticket_bandwidth(ticket_id, amount)
327 .await?)
328 }
329
330 async fn decrease_bandwidth(
331 &self,
332 client_id: i64,
333 amount: i64,
334 ) -> Result<i64, GatewayStorageError> {
335 Ok(self
336 .bandwidth_manager
337 .decrease_bandwidth(client_id, amount)
338 .await?)
339 }
340
341 async fn insert_epoch_signers(
342 &self,
343 epoch_id: i64,
344 signer_ids: Vec<i64>,
345 ) -> Result<(), GatewayStorageError> {
346 self.ticket_manager
347 .insert_ecash_signers(epoch_id, signer_ids)
348 .await?;
349 Ok(())
350 }
351
352 async fn insert_received_ticket(
353 &self,
354 client_id: i64,
355 received_at: OffsetDateTime,
356 serial_number: Vec<u8>,
357 data: Vec<u8>,
358 ) -> Result<i64, GatewayStorageError> {
359 let ticket_id = self
362 .ticket_manager
363 .insert_new_ticket(client_id, received_at)
364 .await?;
365 self.ticket_manager
366 .insert_ticket_data(ticket_id, &serial_number, &data)
367 .await?;
368
369 Ok(ticket_id)
370 }
371
372 async fn contains_ticket(&self, serial_number: &[u8]) -> Result<bool, GatewayStorageError> {
373 Ok(self.ticket_manager.has_ticket_data(serial_number).await?)
374 }
375
376 async fn insert_ticket_verification(
377 &self,
378 ticket_id: i64,
379 signer_id: i64,
380 verified_at: OffsetDateTime,
381 accepted: bool,
382 ) -> Result<(), GatewayStorageError> {
383 self.ticket_manager
384 .insert_ticket_verification(ticket_id, signer_id, verified_at, accepted)
385 .await?;
386 Ok(())
387 }
388
389 async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
390 self.ticket_manager.set_rejected_ticket(ticket_id).await?;
392
393 self.ticket_manager.remove_ticket_data(ticket_id).await?;
396
397 Ok(())
398 }
399
400 async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
401 self.ticket_manager
403 .insert_verified_ticket(ticket_id)
404 .await?;
405
406 self.ticket_manager
409 .remove_ticket_verification(ticket_id)
410 .await?;
411 Ok(())
412 }
413
414 async fn remove_verified_ticket_binary_data(
415 &self,
416 ticket_id: i64,
417 ) -> Result<(), GatewayStorageError> {
418 self.ticket_manager
419 .remove_binary_ticket_data(ticket_id)
420 .await?;
421 Ok(())
422 }
423
424 async fn get_all_verified_tickets_with_sn(
425 &self,
426 ) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
427 Ok(self
428 .ticket_manager
429 .get_all_verified_tickets_with_sn()
430 .await?)
431 }
432
433 async fn get_all_proposed_tickets_with_sn(
434 &self,
435 proposal_id: u32,
436 ) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
437 Ok(self
438 .ticket_manager
439 .get_all_proposed_tickets_with_sn(proposal_id as i64)
440 .await?)
441 }
442
443 async fn insert_redemption_proposal(
444 &self,
445 tickets: &[VerifiedTicket],
446 proposal_id: u32,
447 created_at: OffsetDateTime,
448 ) -> Result<(), GatewayStorageError> {
449 self.ticket_manager
453 .insert_redemption_proposal(proposal_id as i64, created_at)
454 .await?;
455
456 self.ticket_manager
458 .insert_verified_tickets_proposal_id(
459 tickets.iter().map(|t| t.ticket_id),
460 proposal_id as i64,
461 )
462 .await?;
463 Ok(())
464 }
465
466 async fn clear_post_proposal_data(
467 &self,
468 proposal_id: u32,
469 resolved_at: OffsetDateTime,
470 rejected: bool,
471 ) -> Result<(), GatewayStorageError> {
472 self.ticket_manager
474 .update_redemption_proposal(proposal_id as i64, resolved_at, rejected)
475 .await?;
476
477 self.ticket_manager
479 .remove_redeemed_tickets_data(proposal_id as i64)
480 .await?;
481
482 self.ticket_manager
484 .remove_verified_tickets(proposal_id as i64)
485 .await?;
486
487 Ok(())
488 }
489
490 async fn latest_proposal(&self) -> Result<Option<RedemptionProposal>, GatewayStorageError> {
491 Ok(self.ticket_manager.get_latest_redemption_proposal().await?)
492 }
493
494 async fn get_all_unverified_tickets(&self) -> Result<Vec<ClientTicket>, GatewayStorageError> {
495 self.ticket_manager
496 .get_unverified_tickets()
497 .await?
498 .into_iter()
499 .map(TryInto::try_into)
500 .collect()
501 }
502
503 async fn get_all_unresolved_proposals(&self) -> Result<Vec<i64>, GatewayStorageError> {
504 Ok(self
505 .ticket_manager
506 .get_all_unresolved_redemption_proposal_ids()
507 .await?)
508 }
509
510 async fn get_votes(&self, ticket_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
511 Ok(self
512 .ticket_manager
513 .get_verification_votes(ticket_id)
514 .await?)
515 }
516
517 async fn get_signers(&self, epoch_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
518 Ok(self.ticket_manager.get_epoch_signers(epoch_id).await?)
519 }
520
521 async fn insert_wireguard_peer(
527 &self,
528 peer: &defguard_wireguard_rs::host::Peer,
529 client_type: ClientType,
530 ) -> Result<i64, GatewayStorageError> {
531 let client_id = match self
532 .wireguard_peer_manager
533 .retrieve_peer(&peer.public_key.to_string())
534 .await?
535 {
536 Some(peer) => peer.client_id,
537 None => self.client_manager.insert_client(client_type).await?,
538 };
539 let peer = WireguardPeer::from_defguard_peer(peer.clone(), client_id)?;
540 self.wireguard_peer_manager.insert_peer(&peer).await?;
541 Ok(client_id)
542 }
543
544 async fn get_wireguard_peer(
550 &self,
551 peer_public_key: &str,
552 ) -> Result<Option<WireguardPeer>, GatewayStorageError> {
553 let peer = self
554 .wireguard_peer_manager
555 .retrieve_peer(peer_public_key)
556 .await?;
557 Ok(peer)
558 }
559
560 async fn get_all_wireguard_peers(&self) -> Result<Vec<WireguardPeer>, GatewayStorageError> {
562 let ret = self.wireguard_peer_manager.retrieve_all_peers().await?;
563 Ok(ret)
564 }
565
566 async fn remove_wireguard_peer(
572 &self,
573 peer_public_key: &str,
574 ) -> Result<(), GatewayStorageError> {
575 self.wireguard_peer_manager
576 .remove_peer(peer_public_key)
577 .await?;
578 Ok(())
579 }
580}