nym_gateway_storage/
lib.rs

1// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: GPL-3.0-only
3
4use async_trait::async_trait;
5use bandwidth::BandwidthManager;
6use clients::{ClientManager, ClientType};
7use models::{
8    Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
9    VerifiedTicket, WireguardPeer,
10};
11use nym_credentials_interface::ClientTicket;
12use nym_gateway_requests::shared_key::SharedSymmetricKey;
13use nym_sphinx::DestinationAddressBytes;
14use shared_keys::SharedKeysManager;
15use sqlx::{
16    ConnectOptions,
17    sqlite::{SqliteAutoVacuum, SqliteSynchronous},
18};
19use std::{path::Path, time::Duration};
20use tickets::TicketStorageManager;
21use time::OffsetDateTime;
22use tracing::{debug, error, log::LevelFilter};
23
24pub mod bandwidth;
25mod clients;
26pub mod error;
27mod inboxes;
28pub mod models;
29mod shared_keys;
30mod tickets;
31pub mod traits;
32mod wireguard_peers;
33
34pub use error::GatewayStorageError;
35pub use inboxes::InboxManager;
36
37use crate::traits::{BandwidthGatewayStorage, InboxGatewayStorage, SharedKeyGatewayStorage};
38
39fn make_bincode_serializer() -> impl bincode::Options {
40    use bincode::Options;
41    bincode::DefaultOptions::new()
42        .with_big_endian()
43        .with_varint_encoding()
44}
45
46// note that clone here is fine as upon cloning the same underlying pool will be used
47#[derive(Clone)]
48pub struct GatewayStorage {
49    client_manager: ClientManager,
50    shared_key_manager: SharedKeysManager,
51    inbox_manager: InboxManager,
52    bandwidth_manager: BandwidthManager,
53    ticket_manager: TicketStorageManager,
54    wireguard_peer_manager: wireguard_peers::WgPeerManager,
55}
56
57impl GatewayStorage {
58    #[allow(dead_code)]
59    pub(crate) fn client_manager(&self) -> &ClientManager {
60        &self.client_manager
61    }
62
63    pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
64        &self.shared_key_manager
65    }
66
67    pub fn inbox_manager(&self) -> &InboxManager {
68        &self.inbox_manager
69    }
70
71    pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
72        &self.bandwidth_manager
73    }
74
75    #[allow(dead_code)]
76    pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
77        &self.ticket_manager
78    }
79
80    #[allow(dead_code)]
81    pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
82        &self.wireguard_peer_manager
83    }
84
85    pub async fn handle_forget_me(
86        &self,
87        client_address: DestinationAddressBytes,
88    ) -> Result<(), GatewayStorageError> {
89        let client_id = self.get_mixnet_client_id(client_address).await?;
90        self.inbox_manager()
91            .remove_messages_for_client(&client_address.as_base58_string())
92            .await?;
93        self.bandwidth_manager().remove_client(client_id).await?;
94        self.shared_key_manager()
95            .remove_shared_keys(&client_address.as_base58_string())
96            .await?;
97        Ok(())
98    }
99
100    /// Initialises `PersistentStorage` using the provided path.
101    ///
102    /// # Arguments
103    ///
104    /// * `database_path`: path to the database.
105    /// * `message_retrieval_limit`: maximum number of stored client messages that can be retrieved at once.
106    pub async fn init<P: AsRef<Path> + Send>(
107        database_path: P,
108        message_retrieval_limit: i64,
109    ) -> Result<Self, GatewayStorageError> {
110        debug!(
111            "Attempting to connect to database {}",
112            database_path.as_ref().display()
113        );
114
115        // TODO: we can inject here more stuff based on our gateway global config
116        // struct. Maybe different pool size or timeout intervals?
117        let opts = sqlx::sqlite::SqliteConnectOptions::new()
118            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
119            .synchronous(SqliteSynchronous::Normal)
120            .auto_vacuum(SqliteAutoVacuum::Incremental)
121            .log_slow_statements(LevelFilter::Warn, Duration::from_millis(250))
122            .filename(database_path)
123            .create_if_missing(true)
124            .disable_statement_logging();
125
126        // TODO: do we want auto_vacuum ?
127
128        let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
129            Ok(db) => db,
130            Err(err) => {
131                error!("Failed to connect to SQLx database: {err}");
132                return Err(err.into());
133            }
134        };
135
136        Self::from_connection_pool(connection_pool, message_retrieval_limit).await
137    }
138
139    pub async fn from_connection_pool(
140        connection_pool: sqlx::sqlite::SqlitePool,
141        message_retrieval_limit: i64,
142    ) -> Result<Self, GatewayStorageError> {
143        if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
144            error!("Failed to perform migration on the SQLx database: {err}");
145            return Err(err.into());
146        }
147
148        // the cloning here are cheap as connection pool is stored behind an Arc
149        Ok(GatewayStorage {
150            client_manager: clients::ClientManager::new(connection_pool.clone()),
151            wireguard_peer_manager: wireguard_peers::WgPeerManager::new(connection_pool.clone()),
152            shared_key_manager: SharedKeysManager::new(connection_pool.clone()),
153            inbox_manager: InboxManager::new(connection_pool.clone(), message_retrieval_limit),
154            bandwidth_manager: BandwidthManager::new(connection_pool.clone()),
155            ticket_manager: TicketStorageManager::new(connection_pool),
156        })
157    }
158}
159
160#[async_trait]
161impl SharedKeyGatewayStorage for GatewayStorage {
162    async fn get_mixnet_client_id(
163        &self,
164        client_address: DestinationAddressBytes,
165    ) -> Result<i64, GatewayStorageError> {
166        Ok(self
167            .shared_key_manager
168            .client_id(&client_address.as_base58_string())
169            .await?)
170    }
171
172    async fn insert_shared_keys(
173        &self,
174        client_address: DestinationAddressBytes,
175        shared_keys: &SharedSymmetricKey,
176    ) -> Result<i64, GatewayStorageError> {
177        let client_address_bs58 = client_address.as_base58_string();
178        let client_id = match self
179            .shared_key_manager
180            .client_id(&client_address_bs58)
181            .await
182        {
183            Ok(client_id) => client_id,
184            _ => {
185                self.client_manager
186                    .insert_client(ClientType::EntryMixnet)
187                    .await?
188            }
189        };
190        self.shared_key_manager
191            .insert_shared_keys(
192                client_id,
193                client_address_bs58,
194                shared_keys.to_bytes().as_ref(),
195            )
196            .await?;
197        Ok(client_id)
198    }
199
200    async fn get_shared_keys(
201        &self,
202        client_address: DestinationAddressBytes,
203    ) -> Result<Option<PersistedSharedKeys>, GatewayStorageError> {
204        let keys = self
205            .shared_key_manager
206            .get_shared_keys(&client_address.as_base58_string())
207            .await?;
208        Ok(keys)
209    }
210
211    #[allow(dead_code)]
212    async fn remove_shared_keys(
213        &self,
214        client_address: DestinationAddressBytes,
215    ) -> Result<(), GatewayStorageError> {
216        self.shared_key_manager
217            .remove_shared_keys(&client_address.as_base58_string())
218            .await?;
219        Ok(())
220    }
221
222    async fn update_last_used_authentication_timestamp(
223        &self,
224        client_id: i64,
225        last_used_authentication_timestamp: OffsetDateTime,
226    ) -> Result<(), GatewayStorageError> {
227        self.shared_key_manager
228            .update_last_used_authentication_timestamp(
229                client_id,
230                last_used_authentication_timestamp,
231            )
232            .await?;
233        Ok(())
234    }
235
236    async fn get_client(&self, client_id: i64) -> Result<Option<Client>, GatewayStorageError> {
237        let client = self.client_manager.get_client(client_id).await?;
238        Ok(client)
239    }
240}
241
242#[async_trait]
243impl InboxGatewayStorage for GatewayStorage {
244    async fn store_message(
245        &self,
246        client_address: DestinationAddressBytes,
247        message: Vec<u8>,
248    ) -> Result<(), GatewayStorageError> {
249        self.inbox_manager
250            .insert_message(&client_address.as_base58_string(), message)
251            .await?;
252        Ok(())
253    }
254
255    async fn retrieve_messages(
256        &self,
257        client_address: DestinationAddressBytes,
258        start_after: Option<i64>,
259    ) -> Result<(Vec<StoredMessage>, Option<i64>), GatewayStorageError> {
260        let messages = self
261            .inbox_manager
262            .get_messages(&client_address.as_base58_string(), start_after)
263            .await?;
264        Ok(messages)
265    }
266
267    async fn remove_messages(&self, ids: Vec<i64>) -> Result<(), GatewayStorageError> {
268        for id in ids {
269            self.inbox_manager.remove_message(id).await?;
270        }
271        Ok(())
272    }
273}
274
275#[async_trait]
276impl BandwidthGatewayStorage for GatewayStorage {
277    async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> {
278        self.bandwidth_manager.insert_new_client(client_id).await?;
279        Ok(())
280    }
281
282    async fn set_expiration(
283        &self,
284        client_id: i64,
285        expiration: OffsetDateTime,
286    ) -> Result<(), GatewayStorageError> {
287        self.bandwidth_manager
288            .set_expiration(client_id, expiration)
289            .await?;
290        Ok(())
291    }
292
293    async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> {
294        self.bandwidth_manager.reset_bandwidth(client_id).await?;
295        Ok(())
296    }
297
298    async fn get_available_bandwidth(
299        &self,
300        client_id: i64,
301    ) -> Result<Option<PersistedBandwidth>, GatewayStorageError> {
302        Ok(self
303            .bandwidth_manager
304            .get_available_bandwidth(client_id)
305            .await?)
306    }
307
308    async fn increase_bandwidth(
309        &self,
310        client_id: i64,
311        amount: i64,
312    ) -> Result<i64, GatewayStorageError> {
313        Ok(self
314            .bandwidth_manager
315            .increase_bandwidth(client_id, amount)
316            .await?)
317    }
318
319    async fn revoke_ticket_bandwidth(
320        &self,
321        ticket_id: i64,
322        amount: i64,
323    ) -> Result<(), GatewayStorageError> {
324        Ok(self
325            .bandwidth_manager
326            .revoke_ticket_bandwidth(ticket_id, amount)
327            .await?)
328    }
329
330    async fn decrease_bandwidth(
331        &self,
332        client_id: i64,
333        amount: i64,
334    ) -> Result<i64, GatewayStorageError> {
335        Ok(self
336            .bandwidth_manager
337            .decrease_bandwidth(client_id, amount)
338            .await?)
339    }
340
341    async fn insert_epoch_signers(
342        &self,
343        epoch_id: i64,
344        signer_ids: Vec<i64>,
345    ) -> Result<(), GatewayStorageError> {
346        self.ticket_manager
347            .insert_ecash_signers(epoch_id, signer_ids)
348            .await?;
349        Ok(())
350    }
351
352    async fn insert_received_ticket(
353        &self,
354        client_id: i64,
355        received_at: OffsetDateTime,
356        serial_number: Vec<u8>,
357        data: Vec<u8>,
358    ) -> Result<i64, GatewayStorageError> {
359        // technically if we crash between those 2 calls we'll have a bit of data inconsistency,
360        // but nothing too tragic. we just won't get paid for a single ticket
361        let ticket_id = self
362            .ticket_manager
363            .insert_new_ticket(client_id, received_at)
364            .await?;
365        self.ticket_manager
366            .insert_ticket_data(ticket_id, &serial_number, &data)
367            .await?;
368
369        Ok(ticket_id)
370    }
371
372    async fn contains_ticket(&self, serial_number: &[u8]) -> Result<bool, GatewayStorageError> {
373        Ok(self.ticket_manager.has_ticket_data(serial_number).await?)
374    }
375
376    async fn insert_ticket_verification(
377        &self,
378        ticket_id: i64,
379        signer_id: i64,
380        verified_at: OffsetDateTime,
381        accepted: bool,
382    ) -> Result<(), GatewayStorageError> {
383        self.ticket_manager
384            .insert_ticket_verification(ticket_id, signer_id, verified_at, accepted)
385            .await?;
386        Ok(())
387    }
388
389    async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
390        // set the ticket as rejected
391        self.ticket_manager.set_rejected_ticket(ticket_id).await?;
392
393        // drop all ticket_data - we no longer need it
394        // TODO: or maybe we do as a proof of receiving bad data?
395        self.ticket_manager.remove_ticket_data(ticket_id).await?;
396
397        Ok(())
398    }
399
400    async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
401        // 1. insert into verified table
402        self.ticket_manager
403            .insert_verified_ticket(ticket_id)
404            .await?;
405
406        // TODO: maybe we want to leave that be until ticket gets fully redeemed instead?
407        // 2. remove individual verifications
408        self.ticket_manager
409            .remove_ticket_verification(ticket_id)
410            .await?;
411        Ok(())
412    }
413
414    async fn remove_verified_ticket_binary_data(
415        &self,
416        ticket_id: i64,
417    ) -> Result<(), GatewayStorageError> {
418        self.ticket_manager
419            .remove_binary_ticket_data(ticket_id)
420            .await?;
421        Ok(())
422    }
423
424    async fn get_all_verified_tickets_with_sn(
425        &self,
426    ) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
427        Ok(self
428            .ticket_manager
429            .get_all_verified_tickets_with_sn()
430            .await?)
431    }
432
433    async fn get_all_proposed_tickets_with_sn(
434        &self,
435        proposal_id: u32,
436    ) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
437        Ok(self
438            .ticket_manager
439            .get_all_proposed_tickets_with_sn(proposal_id as i64)
440            .await?)
441    }
442
443    async fn insert_redemption_proposal(
444        &self,
445        tickets: &[VerifiedTicket],
446        proposal_id: u32,
447        created_at: OffsetDateTime,
448    ) -> Result<(), GatewayStorageError> {
449        // if we crash between those, there might a bit of an issue. we should revisit it later
450
451        // 1. insert the actual proposal
452        self.ticket_manager
453            .insert_redemption_proposal(proposal_id as i64, created_at)
454            .await?;
455
456        // 2. update all the associated tickets
457        self.ticket_manager
458            .insert_verified_tickets_proposal_id(
459                tickets.iter().map(|t| t.ticket_id),
460                proposal_id as i64,
461            )
462            .await?;
463        Ok(())
464    }
465
466    async fn clear_post_proposal_data(
467        &self,
468        proposal_id: u32,
469        resolved_at: OffsetDateTime,
470        rejected: bool,
471    ) -> Result<(), GatewayStorageError> {
472        // 1. update proposal metadata
473        self.ticket_manager
474            .update_redemption_proposal(proposal_id as i64, resolved_at, rejected)
475            .await?;
476
477        // 2. remove ticket data rows (we can drop serial numbers)
478        self.ticket_manager
479            .remove_redeemed_tickets_data(proposal_id as i64)
480            .await?;
481
482        // 3. remove verified tickets rows
483        self.ticket_manager
484            .remove_verified_tickets(proposal_id as i64)
485            .await?;
486
487        Ok(())
488    }
489
490    async fn latest_proposal(&self) -> Result<Option<RedemptionProposal>, GatewayStorageError> {
491        Ok(self.ticket_manager.get_latest_redemption_proposal().await?)
492    }
493
494    async fn get_all_unverified_tickets(&self) -> Result<Vec<ClientTicket>, GatewayStorageError> {
495        self.ticket_manager
496            .get_unverified_tickets()
497            .await?
498            .into_iter()
499            .map(TryInto::try_into)
500            .collect()
501    }
502
503    async fn get_all_unresolved_proposals(&self) -> Result<Vec<i64>, GatewayStorageError> {
504        Ok(self
505            .ticket_manager
506            .get_all_unresolved_redemption_proposal_ids()
507            .await?)
508    }
509
510    async fn get_votes(&self, ticket_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
511        Ok(self
512            .ticket_manager
513            .get_verification_votes(ticket_id)
514            .await?)
515    }
516
517    async fn get_signers(&self, epoch_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
518        Ok(self.ticket_manager.get_epoch_signers(epoch_id).await?)
519    }
520
521    /// Insert a wireguard peer in the storage.
522    ///
523    /// # Arguments
524    ///
525    /// * `peer`: wireguard peer data to be stored
526    async fn insert_wireguard_peer(
527        &self,
528        peer: &defguard_wireguard_rs::host::Peer,
529        client_type: ClientType,
530    ) -> Result<i64, GatewayStorageError> {
531        let client_id = match self
532            .wireguard_peer_manager
533            .retrieve_peer(&peer.public_key.to_string())
534            .await?
535        {
536            Some(peer) => peer.client_id,
537            None => self.client_manager.insert_client(client_type).await?,
538        };
539        let peer = WireguardPeer::from_defguard_peer(peer.clone(), client_id)?;
540        self.wireguard_peer_manager.insert_peer(&peer).await?;
541        Ok(client_id)
542    }
543
544    /// Tries to retrieve available bandwidth for the particular peer.
545    ///
546    /// # Arguments
547    ///
548    /// * `peer_public_key`: wireguard public key of the peer to be retrieved.
549    async fn get_wireguard_peer(
550        &self,
551        peer_public_key: &str,
552    ) -> Result<Option<WireguardPeer>, GatewayStorageError> {
553        let peer = self
554            .wireguard_peer_manager
555            .retrieve_peer(peer_public_key)
556            .await?;
557        Ok(peer)
558    }
559
560    /// Retrieves all wireguard peers.
561    async fn get_all_wireguard_peers(&self) -> Result<Vec<WireguardPeer>, GatewayStorageError> {
562        let ret = self.wireguard_peer_manager.retrieve_all_peers().await?;
563        Ok(ret)
564    }
565
566    /// Remove a wireguard peer from the storage.
567    ///
568    /// # Arguments
569    ///
570    /// * `peer_public_key`: wireguard public key of the peer to be removed.
571    async fn remove_wireguard_peer(
572        &self,
573        peer_public_key: &str,
574    ) -> Result<(), GatewayStorageError> {
575        self.wireguard_peer_manager
576            .remove_peer(peer_public_key)
577            .await?;
578        Ok(())
579    }
580}