fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::rustls::install_crypto_provider;
20use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
21use fedimint_gateway_common::ConnectFedPayload;
22use fedimint_gateway_server::Gateway;
23use fedimint_logging::LOG_TEST;
24use fedimint_rocksdb::RocksDb;
25use fedimint_server::config::ServerConfig;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{
29    ReconnectP2PConnections, p2p_connection_type_channels, p2p_status_channels,
30};
31use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
32use fedimint_server::{ConnectionLimits, consensus};
33use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
34use fedimint_testing_core::config::local_config_gen_params;
35use tracing::info;
36
37/// Test fixture for a running fedimint federation
38#[derive(Clone)]
39pub struct FederationTest {
40    configs: BTreeMap<PeerId, ServerConfig>,
41    server_init: ServerModuleInitRegistry,
42    client_init: ClientModuleInitRegistry,
43    primary_module_kind: ModuleKind,
44    _task: TaskGroup,
45    num_peers: u16,
46    num_offline: u16,
47}
48
49impl FederationTest {
50    /// Create two clients, useful for send/receive tests
51    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
52        (self.new_client().await, self.new_client().await)
53    }
54
55    /// Create a client connected to this fed
56    pub async fn new_client(&self) -> ClientHandleArc {
57        let client_config = self.configs[&PeerId::from(0)]
58            .consensus
59            .to_client_config(&self.server_init)
60            .unwrap();
61
62        self.new_client_with(client_config, MemDatabase::new().into(), None)
63            .await
64    }
65
66    /// Create a client connected to this fed but using RocksDB instead of MemDB
67    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
68        let client_config = self.configs[&PeerId::from(0)]
69            .consensus
70            .to_client_config(&self.server_init)
71            .unwrap();
72
73        self.new_client_with(
74            client_config,
75            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
76                .await
77                .expect("Couldn't open DB")
78                .into(),
79            None,
80        )
81        .await
82    }
83
84    /// Create a new admin api for the given PeerId
85    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
86        let config = self.configs.get(&peer_id).expect("peer to have config");
87
88        DynGlobalApi::new_admin(
89            peer_id,
90            config.consensus.api_endpoints()[&peer_id].url.clone(),
91            &None,
92            // No need to enable DHT during testing
93            false,
94            // No need to enable next stack during testing
95            false,
96        )
97        .await
98    }
99
100    /// Create a new admin client connected to this fed
101    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
102        let client_config = self.configs[&PeerId::from(0)]
103            .consensus
104            .to_client_config(&self.server_init)
105            .unwrap();
106
107        let admin_creds = AdminCreds { peer_id, auth };
108
109        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
110            .await
111    }
112
113    pub async fn new_client_with(
114        &self,
115        client_config: ClientConfig,
116        db: Database,
117        admin_creds: Option<AdminCreds>,
118    ) -> ClientHandleArc {
119        info!(target: LOG_TEST, "Setting new client with config");
120        let mut client_builder = Client::builder().await.expect("Failed to build client");
121        client_builder.with_module_inits(self.client_init.clone());
122        client_builder.with_primary_module_kind(self.primary_module_kind.clone());
123        if let Some(admin_creds) = admin_creds {
124            client_builder.set_admin_creds(admin_creds);
125        }
126        let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
127        client_builder
128            .preview_with_existing_config(client_config, None, None)
129            .await
130            .expect("Preview failed")
131            .join(
132                db,
133                RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
134                    &client_secret,
135                )),
136            )
137            .await
138            .map(Arc::new)
139            .expect("Failed to build client")
140    }
141
142    /// Return first invite code for gateways
143    pub fn invite_code(&self) -> InviteCode {
144        self.configs[&PeerId::from(0)].get_invite_code(None)
145    }
146
147    ///  Return the federation id
148    pub fn id(&self) -> FederationId {
149        self.configs[&PeerId::from(0)]
150            .consensus
151            .to_client_config(&self.server_init)
152            .unwrap()
153            .global
154            .calculate_federation_id()
155    }
156
157    /// Connects a gateway to this `FederationTest`
158    pub async fn connect_gateway(&self, gw: &Gateway) {
159        gw.handle_connect_federation(ConnectFedPayload {
160            invite_code: self.invite_code().to_string(),
161            use_tor: Some(false),
162            recover: Some(false),
163        })
164        .await
165        .expect("Failed to connect federation");
166    }
167
168    /// Return all online PeerIds
169    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
170        // we can assume this ordering since peers are started in ascending order
171        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
172    }
173
174    /// Returns true if the federation is running in a degraded state
175    pub fn is_degraded(&self) -> bool {
176        self.num_offline > 0
177    }
178}
179
180/// Builder struct for creating a `FederationTest`.
181#[derive(Clone, Debug)]
182pub struct FederationTestBuilder {
183    num_peers: u16,
184    num_offline: u16,
185    base_port: u16,
186    primary_module_kind: ModuleKind,
187    version_hash: String,
188    modules: ServerModuleConfigGenParamsRegistry,
189    server_init: ServerModuleInitRegistry,
190    client_init: ClientModuleInitRegistry,
191    bitcoin_rpc_connection: DynServerBitcoinRpc,
192    enable_mint_fees: bool,
193}
194
195impl FederationTestBuilder {
196    pub fn new(
197        params: ServerModuleConfigGenParamsRegistry,
198        server_init: ServerModuleInitRegistry,
199        client_init: ClientModuleInitRegistry,
200        primary_module_kind: ModuleKind,
201        num_offline: u16,
202        bitcoin_rpc_connection: DynServerBitcoinRpc,
203    ) -> FederationTestBuilder {
204        let num_peers = 4;
205        Self {
206            num_peers,
207            num_offline,
208            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
209                .expect("Failed to allocate a port range"),
210            primary_module_kind,
211            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
212            modules: params,
213            server_init,
214            client_init,
215            bitcoin_rpc_connection,
216            enable_mint_fees: true,
217        }
218    }
219
220    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
221        self.num_peers = num_peers;
222        self
223    }
224
225    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
226        self.num_offline = num_offline;
227        self
228    }
229
230    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
231        self.base_port = base_port;
232        self
233    }
234
235    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
236        self.primary_module_kind = primary_module_kind;
237        self
238    }
239
240    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
241        self.version_hash = version_hash;
242        self
243    }
244
245    pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
246        self.enable_mint_fees = false;
247        self
248    }
249
250    #[allow(clippy::too_many_lines)]
251    pub async fn build(self) -> FederationTest {
252        install_crypto_provider().await;
253        let num_offline = self.num_offline;
254        assert!(
255            self.num_peers > 3 * self.num_offline,
256            "too many peers offline ({num_offline}) to reach consensus"
257        );
258        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
259        let params = local_config_gen_params(&peers, self.base_port, self.enable_mint_fees)
260            .expect("Generates local config");
261
262        let configs = ServerConfig::trusted_dealer_gen(
263            self.modules,
264            &params,
265            &self.server_init,
266            &self.version_hash,
267        );
268
269        let task_group = TaskGroup::new();
270        for (peer_id, cfg) in configs.clone() {
271            let peer_port = self.base_port + u16::from(peer_id) * 3;
272
273            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
274            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
275            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
276
277            if u16::from(peer_id) >= self.num_peers - self.num_offline {
278                continue;
279            }
280
281            let instances = cfg.consensus.iter_module_instances();
282            let decoders = self.server_init.available_decoders(instances).unwrap();
283            let db = Database::new(MemDatabase::new(), decoders);
284            let module_init_registry = self.server_init.clone();
285            let subgroup = task_group.make_subgroup();
286            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
287            let code_version_str = env!("CARGO_PKG_VERSION");
288
289            let connector = TlsTcpConnector::new(
290                cfg.tls_config(),
291                p2p_bind,
292                cfg.local.p2p_endpoints.clone(),
293                cfg.local.identity,
294            )
295            .await
296            .into_dyn();
297
298            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
299            let (p2p_connection_type_senders, p2p_connection_type_receivers) =
300                p2p_connection_type_channels(connector.peers());
301
302            let connections = ReconnectP2PConnections::new(
303                cfg.local.identity,
304                connector,
305                &task_group,
306                p2p_status_senders,
307                p2p_connection_type_senders,
308            )
309            .into_dyn();
310
311            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
312
313            task_group.spawn("fedimintd", move |_| async move {
314                Box::pin(consensus::run(
315                    connections,
316                    p2p_status_receivers,
317                    p2p_connection_type_receivers,
318                    api_bind,
319                    None,
320                    vec![],
321                    cfg.clone(),
322                    db.clone(),
323                    module_init_registry,
324                    &subgroup,
325                    ApiSecrets::default(),
326                    checkpoint_dir,
327                    code_version_str.to_string(),
328                    bitcoin_rpc_connection,
329                    ui_bind,
330                    Box::new(|_| axum::Router::new()),
331                    1,
332                    ConnectionLimits {
333                        max_connections: 1000,
334                        max_requests_per_connection: 100,
335                    },
336                ))
337                .await
338                .expect("Could not initialise consensus");
339            });
340        }
341
342        for (peer_id, config) in configs.clone() {
343            if u16::from(peer_id) >= self.num_peers - self.num_offline {
344                continue;
345            }
346
347            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
348            // defaulting to Tcp variant.
349            let api = DynGlobalApi::new_admin(
350                peer_id,
351                config.consensus.api_endpoints()[&peer_id].url.clone(),
352                &None,
353                // No need for dht when testing
354                false,
355                false,
356            )
357            .await
358            .unwrap();
359
360            while let Err(e) = api
361                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
362                .await
363            {
364                sleep_in_test(
365                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
366                    Duration::from_millis(500),
367                )
368                .await;
369            }
370        }
371
372        FederationTest {
373            configs,
374            server_init: self.server_init,
375            client_init: self.client_init,
376            primary_module_kind: self.primary_module_kind,
377            _task: task_group,
378            num_peers: self.num_peers,
379            num_offline: self.num_offline,
380        }
381    }
382}