flmodules 0.10.0

Modules used in fledger
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
//! # The signalling server implementation
//!
//! This structure implements a signalling server.
//! It communicates using [`WSSignalMessageToNode`] and [`WSSignalMessageFromNode`] messages
//! with the nodes.
//!
//! # Node connection to the signalling server
//!
//! If a node wants to connect to the signalling server, it needs to prove it holds the
//! private key corresponding to its public key by signing a random message sent by
//! the signalling server:
//!
//! - Server sends [`WSSignalMessageToNode::Challenge`] with the current version of the
//! server and a 256-bit random challenge
//! - Node sends [`WSSignalMessageFromNode::Announce`] containing the [`MessageAnnounce`]
//! with the node-information and a signature of the challenge
//! - Server sends [`WSSignalMessageToNode::ListIDsReply`] if the signature has been
//! verified successfully, else it waits for another announce-message
//!
//! # WebRTC signalling setup
//!
//! Once a node has been connected to the signalling server, other nodes can start a
//! WebRTC connection with it.
//! For this, the nodes will send several [`WSSignalMessageFromNode::PeerSetup`] messages
//! that the signalling server will redirect to the other node.
//! These messages contain the [`PeerInfo`] and the [`crate::web_rtc::messages::PeerMessage`] needed to setup a WebRTC connection.
//! One of the nodes is the initializer, while the other is the follower:
//!
//! - Initializer sends a [`crate::web_rtc::messages::PeerMessage::Init`] to the follower
//! - The follower answers with a [`crate::web_rtc::messages::PeerMessage::Offer`] containing
//! information necessary for the setup of the connection
//! - The initializer uses the offer to start its connection and replies with a
//! [`crate::web_rtc::messages::PeerMessage::Answer`], also containing information for the setting
//! up of the connection
//!
//! Once the first message has been sent, both nodes will start to exchange
//! [`crate::web_rtc::messages::PeerMessage::IceCandidate`] messages which contain information
//! about possible connection candidates between the two nodes.
//! These messages will even continue once the connection has been setup, and can be used for
//! example to create a nice handover when the network connection changes.
//!
//! After the connections are set up, only the `IceCandidate` messages are exchanged between the
//! nodes.
//!
//! # Usage of the signalling server
//!
//! You can find an example of how the signalling server is used in
//! <https://github.com/ineiti/fledger/tree/0.7.0/cli/flsignal/src/main.rs>

use bimap::{BiMap, Overwritten};
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use serde::{Deserialize, Serialize};
use serde_with::{hex::Hex, serde_as};
use std::{collections::HashMap, fmt::Formatter};

use crate::{flo::realm::RealmID, nodeconfig::NodeInfo, timer::Timer};
use flarch::{
    broker::{Broker, SubsystemHandler, TranslateFrom, TranslateInto},
    nodeids::{NodeID, U256},
    platform_async_trait,
    web_rtc::{
        messages::PeerInfo,
        websocket::{BrokerWSServer, WSServerIn, WSServerOut},
    },
};

pub type BrokerSignal = Broker<SignalIn, SignalOut>;

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub struct FledgerConfig {
    pub system_realm: Option<RealmID>,
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub struct SignalConfig {
    pub ttl_minutes: u16,
    pub system_realm: Option<RealmID>,
    pub max_list_len: Option<usize>,
}

#[derive(Clone, PartialEq)]
/// Messages for the signalling server
pub enum SignalIn {
    /// One minute timer clock for removing stale connections
    Timer,
    /// Message coming from the WebSocket server.
    WSServer(WSServerOut),
    /// Stop the signalling server
    Stop,
}

#[derive(Clone, Debug, PartialEq)]
/// Messages sent by the signalling server to an eventual listener
pub enum SignalOut {
    /// Statistics about the connected nodes
    NodeStats(Vec<NodeStat>),
    /// Whenever a new node has joined the signalling server
    NewNode(NodeID),
    /// Messages going to the WebSocketServer
    WSServer(WSServerIn),
    /// If the server has been stopped
    Stopped,
}

/// This implements a signalling server.
/// It can be used for tests, in the cli implementation, and
/// will also be used later directly in the network struct to allow for direct node-node setups.
///
/// It handles the setup phase where the nodes authenticate themselves to the server, and passes
/// PeerInfo messages between nodes.
/// It also handles statistics by forwarding NodeStats to a listener.
pub struct SignalServer {
    challenges: BiMap<U256, usize>,
    connection_ids: BiMap<NodeID, usize>,
    info: HashMap<U256, NodeInfo>,
    ttl: HashMap<usize, u16>,
    config: SignalConfig,
}

/// Our current version - will change if the API is incompatible.
pub const SIGNAL_VERSION: u64 = 3;

impl SignalServer {
    /// Creates a new [`SignalServer`].
    /// `ttl_minutes` is the minimum time an idle node will be
    /// kept in the list.
    pub async fn start(
        ws_server: BrokerWSServer,
        config: SignalConfig,
    ) -> anyhow::Result<BrokerSignal> {
        let mut broker = Self::new(config).await?;
        broker.link_bi(ws_server).await?;
        Timer::start()
            .await?
            .tick_minute(broker.clone(), SignalIn::Timer)
            .await?;
        Ok(broker)
    }

    pub async fn new(config: SignalConfig) -> anyhow::Result<BrokerSignal> {
        let mut broker = Broker::new();
        broker
            .add_handler(Box::new(SignalServer {
                challenges: BiMap::new(),
                connection_ids: BiMap::new(),
                info: HashMap::new(),
                ttl: HashMap::new(),
                config,
            }))
            .await?;
        Ok(broker)
    }

    fn msg_in(&mut self, msg_in: SignalIn) -> Vec<SignalOut> {
        match msg_in {
            SignalIn::Timer => {
                self.msg_in_timer();
                vec![]
            }
            SignalIn::WSServer(msg_wss) => self.msg_wss(msg_wss),
            SignalIn::Stop => vec![SignalOut::WSServer(WSServerIn::Stop)],
        }
    }

    fn msg_wss(&mut self, msg: WSServerOut) -> Vec<SignalOut> {
        match msg {
            WSServerOut::Message(index, msg_s) => {
                self.ttl
                    .entry(index.clone())
                    .and_modify(|ttl| *ttl = self.config.ttl_minutes);
                if let Ok(msg_ws) = serde_json::from_str::<WSSignalMessageFromNode>(&msg_s) {
                    return self.msg_ws_process(index, msg_ws);
                }
            }
            WSServerOut::NewConnection(index) => return self.msg_ws_connect(index),
            WSServerOut::Disconnection(id) => self.remove_node(id),
            WSServerOut::Stopped => return vec![SignalOut::Stopped],
        }
        vec![]
    }

    fn msg_in_timer(&mut self) {
        let mut to_remove = Vec::new();
        for (index, ttl) in self.ttl.iter_mut() {
            *ttl -= 1;
            if *ttl == 0 {
                log::info!("Removing idle node {index}");
                to_remove.push(*index);
            }
        }
        for id in to_remove {
            self.remove_node(id);
        }
    }

    // The id is the challange until the announcement succeeds. Then ws_announce calls
    // set_cb_message again to create a new callback using the node-id as id.
    fn msg_ws_process(&mut self, index: usize, msg: WSSignalMessageFromNode) -> Vec<SignalOut> {
        match msg {
            WSSignalMessageFromNode::Announce(ann) => self.ws_announce(index, ann),
            WSSignalMessageFromNode::ListIDsRequest => self.ws_list_ids(index),
            WSSignalMessageFromNode::PeerSetup(pi) => self.ws_peer_setup(index, pi),
            WSSignalMessageFromNode::NodeStats(ns) => self.ws_node_stats(ns),
        }
    }

    fn msg_ws_connect(&mut self, index: usize) -> Vec<SignalOut> {
        log::trace!("Sending challenge to new connection");
        let challenge = U256::rnd();
        self.challenges.insert(challenge, index);
        self.ttl.insert(index, self.config.ttl_minutes);
        let challenge_msg =
            serde_json::to_string(&WSSignalMessageToNode::Challenge(SIGNAL_VERSION, challenge))
                .unwrap();
        vec![SignalOut::WSServer(WSServerIn::Message(
            index,
            challenge_msg,
        ))]
    }

    fn ws_announce(&mut self, index: usize, msg: MessageAnnounce) -> Vec<SignalOut> {
        let challenge = match self.challenges.get_by_right(&index) {
            Some(id) => id.clone(),
            None => {
                log::warn!("Got an announcement message without challenge.");
                return vec![];
            }
        };
        if !msg.node_info.verify(&challenge.to_bytes(), &msg.signature) {
            log::warn!("Got node with wrong signature");
            return vec![];
        }
        let id = msg.node_info.get_id();
        let mut msgs = vec![];
        if let Overwritten::Left(_, old) = self.connection_ids.insert(id, index) {
            log::warn!("The same ID is already connected to this signalling server - sending kill signal to previous connection");
            msgs.append(&mut self.send_msg_node(
                old,
                WSSignalMessageToNode::Error("New Connection with same ID".into()),
            ))
        }

        log::info!("Registration of node-id {}: {}", id, msg.node_info.name);
        self.info.insert(id, msg.node_info);
        self.challenges.remove_by_left(&challenge);
        msgs.append(&mut self.send_msg_node(
            index,
            WSSignalMessageToNode::SystemConfig(FledgerConfig {
                system_realm: self.config.system_realm.clone(),
            }),
        ));
        let list = self
            .info
            .iter()
            .map(|(_, info)| info.clone())
            .collect::<Vec<_>>();
        for id in self.connection_ids.iter() {
            if id.1 != &index {
                msgs.append(
                    &mut self
                        .send_msg_node(*id.1, WSSignalMessageToNode::ListIDsReply(list.clone())),
                )
            }
        }
        msgs.push(SignalOut::NewNode(id));
        msgs
    }

    fn ws_list_ids(&mut self, id: usize) -> Vec<SignalOut> {
        let mut rng = StdRng::seed_from_u64(id as u64);
        let max_size = self.config.max_list_len.unwrap_or(self.info.len());
        let list = self
            .info
            .values()
            .cloned()
            .choose_multiple(&mut rng, max_size);

        self.send_msg_node(id, WSSignalMessageToNode::ListIDsReply(list))
    }

    fn ws_peer_setup(&mut self, index: usize, pi: PeerInfo) -> Vec<SignalOut> {
        let id = match self.connection_ids.get_by_right(&index) {
            Some(id) => id,
            None => {
                log::warn!("Got a peer-setup message without challenge.");
                return vec![];
            }
        };
        log::trace!("Node {} sent peer setup: {:?}", id, pi);
        if let Some(dst) = pi.get_remote(id) {
            if let Some(dst_index) = self.connection_ids.get_by_left(&dst) {
                return self.send_msg_node(*dst_index, WSSignalMessageToNode::PeerSetup(pi));
            }
        }
        vec![]
    }

    fn ws_node_stats(&mut self, ns: Vec<NodeStat>) -> Vec<SignalOut> {
        vec![SignalOut::NodeStats(ns)]
    }

    fn send_msg_node(&self, index: usize, msg: WSSignalMessageToNode) -> Vec<SignalOut> {
        vec![SignalOut::WSServer(WSServerIn::Message(
            index,
            serde_json::to_string(&msg).unwrap(),
        ))]
    }

    fn remove_node(&mut self, index: usize) {
        log::info!("Removing node {index} from {:?}", self.info);
        self.challenges.remove_by_right(&index);
        if let Some((id, _)) = self.connection_ids.remove_by_right(&index) {
            self.info.remove(&id);
        }
        log::info!("Info is now: {:?}", self.info);
        self.ttl.remove(&index);
    }
}

#[platform_async_trait()]
impl SubsystemHandler<SignalIn, SignalOut> for SignalServer {
    async fn messages(&mut self, msgs: Vec<SignalIn>) -> Vec<SignalOut> {
        msgs.into_iter().flat_map(|msg| self.msg_in(msg)).collect()
    }
}

impl TranslateFrom<WSServerOut> for SignalIn {
    fn translate(msg: WSServerOut) -> Option<Self> {
        Some(SignalIn::WSServer(msg))
    }
}
impl TranslateInto<WSServerIn> for SignalOut {
    fn translate(self) -> Option<WSServerIn> {
        if let SignalOut::WSServer(msg_wss) = self {
            Some(msg_wss)
        } else {
            None
        }
    }
}

/// Message is a list of messages to be sent between the node and the signal server.
///
/// When a new node connects to the signalling server, the server starts by sending
/// a "Challenge" to the node.
/// The node can then announce itself using that challenge.
/// - Challenge is sent by the signalling server to the node
/// - ListIDsReply contains the NodeInfos of the currently connected nodes
/// - PeerRequest is sent by a node to ask to connect to another node. The
/// server will send a 'PeerReply' to the corresponding node, which will continue
/// the protocol by sending its own PeerRequest.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub enum WSSignalMessageToNode {
    /// Sends the current version of the signalling server and a random 256 bit number for authentication
    Challenge(u64, U256),
    /// A list of currently connected nodes
    ListIDsReply(Vec<NodeInfo>),
    /// Information for setting up a WebRTC connection
    PeerSetup(PeerInfo),
    /// General configuration for the system
    SystemConfig(FledgerConfig),
    /// An error occured in the signalling server
    Error(String),
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
/// A message coming from a node
pub enum WSSignalMessageFromNode {
    /// The reply to a [`WSSignalMessageToNode::Challenge`] message, containing information about the
    /// node, as well as a signature on the random 256 bit number being sent in the challenge.
    Announce(MessageAnnounce),
    /// Request an updated list of the currently connected nodes
    ListIDsRequest,
    /// Connection information for setting up a WebRTC connection
    PeerSetup(PeerInfo),
    /// Some statistics about its connections from the remote node
    NodeStats(Vec<NodeStat>),
}

impl std::fmt::Display for WSSignalMessageToNode {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            WSSignalMessageToNode::Challenge(_, _) => write!(f, "Challenge"),
            WSSignalMessageToNode::ListIDsReply(_) => write!(f, "ListIDsReply"),
            WSSignalMessageToNode::PeerSetup(_) => write!(f, "PeerSetup"),
            WSSignalMessageToNode::SystemConfig(_) => write!(f, "SystemConfig"),
            WSSignalMessageToNode::Error(_) => write!(f, "Error"),
        }
    }
}

impl std::fmt::Display for WSSignalMessageFromNode {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            WSSignalMessageFromNode::Announce(_) => write!(f, "Announce"),
            WSSignalMessageFromNode::ListIDsRequest => write!(f, "ListIDsRequest"),
            WSSignalMessageFromNode::PeerSetup(_) => write!(f, "PeerSetup"),
            WSSignalMessageFromNode::NodeStats(_) => write!(f, "NodeStats"),
        }
    }
}

#[serde_as]
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
/// What a node sends when it connects to the signalling server
pub struct MessageAnnounce {
    /// The signalling server version the node knows
    pub version: u64,
    /// The challenge the server sent to this node.
    /// If the challenge is not correct, nothing happens, and the server waits for the
    /// correct challenge to arrive
    pub challenge: U256,
    /// Information about the remote node.
    /// The [`NodeInfo.get_id()`] will be used as the ID of this node.
    pub node_info: NodeInfo,
    #[serde_as(as = "Hex")]
    /// The signature of the challenge with the private key of the node.
    pub signature: Vec<u8>,
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
/// Some statistics about the connections to other nodes.
pub struct NodeStat {
    /// The id of the remote node
    pub id: NodeID,
    /// Some version
    pub version: String,
    /// The round-trip time in milliseconds
    pub ping_ms: u32,
    /// How many ping-packets have been received from this node
    pub ping_rx: u32,
}

impl std::fmt::Debug for SignalIn {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
        match self {
            SignalIn::WSServer(_) => write!(f, "WebSocket"),
            SignalIn::Timer => write!(f, "Timer"),
            SignalIn::Stop => write!(f, "Stop"),
        }
    }
}