1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
//!
//! The application context shared between all components.
//! Think of it as a simple Dependency Injection container.
//!
//! Create with a `DataDir` instance: `AppContext::try_from(data_dir)`
//!
use crate::services::user_service::UserService;
#[cfg(any(test, feature = "testing"))]
use crate::MockDataDir;
use crate::{
metrics_server::routes::metrics::{Metrics, MetricsInitError},
persistence::{
files::{events::EventsService, FileIoError, FileService},
sql::{Migrator, PgEventListener, SqlDb},
},
ConfigToml, DataDir,
};
use pubky_common::crypto::Keypair;
use std::sync::Arc;
use std::time::Duration;
/// Errors that can occur when converting a `DataDir` to an `AppContext`.
#[derive(Debug, thiserror::Error)]
pub enum AppContextConversionError {
/// Failed to ensure data directory exists and is writable.
#[error("Failed to ensure data directory exists and is writable: {0}")]
DataDir(anyhow::Error),
/// Failed to read or create config file.
#[error("Failed to read or create config file: {0}")]
Config(anyhow::Error),
/// Failed to read or create keypair.
#[error("Failed to read or create keypair: {0}")]
Keypair(anyhow::Error),
/// Failed to open SQL DB.
#[error("Failed to open SQL DB: {0}")]
SqlDb(sqlx::Error),
/// Failed to run migrations.
#[error("Failed to run migrations: {0}")]
Migrations(anyhow::Error),
/// Failed to build storage operator.
#[error("Failed to build storage operator: {0}")]
Storage(FileIoError),
/// Failed to build pkarr client.
#[error("Failed to build pkarr client: {0}")]
Pkarr(pkarr::errors::BuildError),
/// Failed to start the Postgres event listener.
#[error("Failed to start Postgres event listener: {0}")]
PgEventListener(sqlx::Error),
/// Failed to initialize metrics.
#[error("Failed to initialize metrics: {0}")]
Metrics(MetricsInitError),
}
/// The application context shared between all components.
/// Think of it as a simple Dependency Injection container.
///
/// Create with a `DataDir` instance: `AppContext::try_from(data_dir)`
///
#[derive(Clone)]
pub struct AppContext {
/// The SQL database connection.
pub(crate) sql_db: SqlDb,
/// The storage operator to store files.
pub(crate) file_service: FileService,
pub(crate) config_toml: ConfigToml,
/// Keep data_dir alive. The mock dir will cleanup on drop.
pub(crate) data_dir: Arc<dyn DataDir>,
pub(crate) keypair: Keypair,
/// Main pkarr instance. This will automatically turn into a DHT server after 15 minutes after startup.
/// We need to keep this alive.
pub(crate) pkarr_client: pkarr::Client,
/// pkarr client builder in case we need to create a more instances.
/// Comes ready with the correct bootstrap nodes and relays.
pub(crate) pkarr_builder: pkarr::ClientBuilder,
/// Events service for managing event creation and broadcasting.
pub(crate) events_service: EventsService,
/// Metrics for all endpoints.
pub(crate) metrics: Metrics,
/// Background listener for Postgres event notifications.
/// Enables cross-instance event propagation for /events-stream's SSE functionality.
/// Kept alive for the background task, not for direct access.
_pg_event_listener: Arc<PgEventListener>,
/// User service for quota resolution and user creation with defaults.
pub(crate) user_service: UserService,
}
impl AppContext {
/// Create a new AppContext for testing.
#[cfg(any(test, feature = "testing"))]
pub async fn test() -> Self {
let data_dir = MockDataDir::test();
Self::read_from(data_dir)
.await
.expect("failed to build AppContext from DataDirMock")
}
/// Create a new AppContext from a data directory.
pub async fn read_from<D: DataDir + 'static>(
dir: D,
) -> Result<Self, AppContextConversionError> {
dir.ensure_data_dir_exists_and_is_writable()
.map_err(AppContextConversionError::DataDir)?;
let conf = dir
.read_or_create_config_file()
.map_err(AppContextConversionError::Config)?;
let keypair = dir
.read_or_create_keypair()
.map_err(AppContextConversionError::Keypair)?;
let sql_db = Self::connect_to_sql_db(&conf).await?;
Migrator::new(&sql_db)
.run()
.await
.map_err(AppContextConversionError::Migrations)?;
let events_service = EventsService::new(1000);
let pg_event_listener = PgEventListener::start(sql_db.pool(), events_service.clone())
.await
.map_err(AppContextConversionError::PgEventListener)?;
let user_service = UserService::new(sql_db.clone());
let file_service = FileService::new_from_config(
&conf,
dir.path(),
sql_db.clone(),
events_service.clone(),
user_service.clone(),
)
.map_err(AppContextConversionError::Storage)?;
let pkarr_builder = Self::build_pkarr_builder_from_config(&conf);
Ok(Self {
sql_db,
pkarr_client: pkarr_builder
.clone()
.build()
.map_err(AppContextConversionError::Pkarr)?,
file_service,
pkarr_builder,
config_toml: conf,
keypair,
data_dir: Arc::new(dir),
events_service,
metrics: Metrics::new().map_err(AppContextConversionError::Metrics)?,
_pg_event_listener: Arc::new(pg_event_listener),
user_service,
})
}
}
impl AppContext {
/// Build the pkarr client builder based on the config.
fn build_pkarr_builder_from_config(config_toml: &ConfigToml) -> pkarr::ClientBuilder {
let mut builder = pkarr::ClientBuilder::default();
if let Some(bootstrap_nodes) = &config_toml.pkdns.dht_bootstrap_nodes {
let nodes = bootstrap_nodes
.iter()
.map(|node| node.to_string())
.collect::<Vec<String>>();
builder.bootstrap(&nodes);
// If we set custom bootstrap nodes, we don't want to use the default pkarr relay nodes.
// Otherwise, we could end up with a DHT with testnet boostrap nodes and mainnet relays
// which would give very weird results.
builder.no_relays();
}
if let Some(relays) = &config_toml.pkdns.dht_relay_nodes {
builder
.relays(relays)
.expect("parameters are already urls and therefore valid.");
}
if let Some(request_timeout) = &config_toml.pkdns.dht_request_timeout_ms {
let duration = Duration::from_millis(request_timeout.get());
builder.request_timeout(duration);
}
builder
}
/// Connect to the SQL database.
/// If we are in a test environment and it's a test db connection string,
/// we use an empheral test db.
/// Otherwise, we use the normal db connection.
async fn connect_to_sql_db(
config_toml: &ConfigToml,
) -> Result<SqlDb, AppContextConversionError> {
#[cfg(any(test, feature = "testing"))]
{
// If we are in a test environment and it's a test db connection string,
// we use an empheral test db.
return if config_toml.general.database_url.is_test_db() {
Ok(SqlDb::test().await)
} else {
SqlDb::connect(&config_toml.general.database_url)
.await
.map_err(AppContextConversionError::SqlDb)
};
}
#[cfg(not(any(test, feature = "testing")))]
{
// If we are not in a test environment, we use the normal db connection.
return SqlDb::connect(&config_toml.general.database_url)
.await
.map_err(AppContextConversionError::SqlDb);
}
}
}