nym_gateway_stats_storage/
lib.rs1use error::StatsStorageError;
5use models::StoredFinishedSession;
6use nym_node_metrics::entry::{ActiveSession, FinishedSession};
7use nym_sphinx::DestinationAddressBytes;
8use nym_statistics_common::types::SessionType;
9use sessions::SessionManager;
10use sqlx::{
11 ConnectOptions,
12 sqlite::{SqliteAutoVacuum, SqliteSynchronous},
13};
14use std::path::Path;
15use time::Date;
16use tracing::{debug, error};
17
18pub mod error;
19pub mod models;
20mod sessions;
21
22#[derive(Clone)]
24pub struct PersistentStatsStorage {
25 session_manager: SessionManager,
26}
27
28impl PersistentStatsStorage {
29 pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
35 debug!(
36 "Attempting to connect to database {}",
37 database_path.as_ref().display()
38 );
39
40 let opts = sqlx::sqlite::SqliteConnectOptions::new()
43 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
44 .synchronous(SqliteSynchronous::Normal)
45 .auto_vacuum(SqliteAutoVacuum::Incremental)
46 .filename(database_path)
47 .create_if_missing(true)
48 .disable_statement_logging();
49
50 let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
53 Ok(db) => db,
54 Err(err) => {
55 error!("Failed to connect to SQLx database: {err}");
56 return Err(err.into());
57 }
58 };
59
60 if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
61 error!("Failed to perform migration on the SQLx database: {err}");
62 return Err(err.into());
63 }
64
65 Ok(PersistentStatsStorage {
67 session_manager: sessions::SessionManager::new(connection_pool),
68 })
69 }
70
71 pub async fn insert_finished_session(
73 &self,
74 date: Date,
75 session: FinishedSession,
76 ) -> Result<(), StatsStorageError> {
77 Ok(self
78 .session_manager
79 .insert_finished_session(
80 date,
81 session.duration.as_millis() as i64,
82 session.typ.to_string(),
83 )
84 .await?)
85 }
86
87 pub async fn get_finished_sessions(
88 &self,
89 date: Date,
90 ) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
91 Ok(self.session_manager.get_finished_sessions(date).await?)
92 }
93
94 pub async fn delete_finished_sessions(
95 &self,
96 before_date: Date,
97 ) -> Result<(), StatsStorageError> {
98 Ok(self
99 .session_manager
100 .delete_finished_sessions(before_date)
101 .await?)
102 }
103
104 pub async fn insert_unique_user(
105 &self,
106 date: Date,
107 client_address_bs58: String,
108 ) -> Result<(), StatsStorageError> {
109 Ok(self
110 .session_manager
111 .insert_unique_user(date, client_address_bs58)
112 .await?)
113 }
114
115 pub async fn get_unique_users(&self, date: Date) -> Result<Vec<String>, StatsStorageError> {
116 Ok(self.session_manager.get_unique_users(date).await?)
117 }
118
119 pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
120 Ok(self
121 .session_manager
122 .delete_unique_users(before_date)
123 .await?)
124 }
125
126 pub async fn delete_unique_user(
127 &self,
128 client_address: DestinationAddressBytes,
129 ) -> Result<(), StatsStorageError> {
130 Ok(self
131 .session_manager
132 .delete_unique_user(client_address.as_base58_string())
133 .await?)
134 }
135
136 pub async fn insert_active_session(
137 &self,
138 client_address: DestinationAddressBytes,
139 session: ActiveSession,
140 ) -> Result<(), StatsStorageError> {
141 Ok(self
142 .session_manager
143 .insert_active_session(
144 client_address.as_base58_string(),
145 session.start,
146 session.typ.to_string(),
147 )
148 .await?)
149 }
150
151 pub async fn remember_active_session(
152 &self,
153 client_address: DestinationAddressBytes,
154 ) -> Result<(), StatsStorageError> {
155 Ok(self
156 .session_manager
157 .remember_active_session(client_address.as_base58_string())
158 .await?)
159 }
160
161 pub async fn update_active_session_type(
162 &self,
163 client_address: DestinationAddressBytes,
164 session_type: SessionType,
165 ) -> Result<(), StatsStorageError> {
166 Ok(self
167 .session_manager
168 .update_active_session_type(client_address.as_base58_string(), session_type.to_string())
169 .await?)
170 }
171
172 pub async fn get_active_session(
173 &self,
174 client_address: DestinationAddressBytes,
175 ) -> Result<Option<ActiveSession>, StatsStorageError> {
176 Ok(self
177 .session_manager
178 .get_active_session(client_address.as_base58_string())
179 .await?
180 .map(Into::into))
181 }
182
183 pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
184 Ok(self
185 .session_manager
186 .get_all_active_sessions()
187 .await?
188 .into_iter()
189 .map(Into::into)
190 .collect())
191 }
192
193 pub async fn get_started_sessions_count(
194 &self,
195 start_date: Date,
196 ) -> Result<i64, StatsStorageError> {
197 Ok(self
198 .session_manager
199 .get_started_sessions_count(start_date)
200 .await?)
201 }
202
203 pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
204 Ok(self.session_manager.get_active_users().await?)
205 }
206
207 pub async fn delete_active_session(
208 &self,
209 client_address: DestinationAddressBytes,
210 ) -> Result<(), StatsStorageError> {
211 Ok(self
212 .session_manager
213 .delete_active_session(client_address.as_base58_string())
214 .await?)
215 }
216
217 pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
218 Ok(self.session_manager.cleanup_active_sessions().await?)
219 }
220}