ng_net/
broker.rs

1/*
2 * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
3 * All rights reserved.
4 * Licensed under the Apache License, Version 2.0
5 * <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
6 * or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
7 * at your option. All files in the project carrying such
8 * notice may not be copied, modified, or distributed except
9 * according to those terms.
10*/
11
12//! Broker singleton present in every instance of NextGraph (Client, Server, Core node)
13
14use std::collections::HashMap;
15#[cfg(not(target_arch = "wasm32"))]
16use std::collections::HashSet;
17
18use async_std::stream::StreamExt;
19#[cfg(not(target_arch = "wasm32"))]
20use async_std::sync::Mutex;
21use async_std::sync::{Arc, RwLock};
22use either::Either;
23use futures::channel::mpsc;
24use futures::channel::mpsc::UnboundedSender;
25use futures::SinkExt;
26use once_cell::sync::Lazy;
27
28use ng_repo::errors::*;
29use ng_repo::log::*;
30use ng_repo::types::*;
31
32use crate::actor::SoS;
33use crate::connection::*;
34use crate::server_broker::IServerBroker;
35use crate::types::*;
36use crate::utils::spawn_and_log_error;
37use crate::utils::{Receiver, ResultSend, Sender};
38
39#[doc(hidden)]
40#[derive(Debug, Clone)]
41pub enum ClientPeerId {
42    Local((UserId, DirectPeerId)),
43    Remote(DirectPeerId),
44}
45
46impl ClientPeerId {
47    pub fn key(&self) -> &DirectPeerId {
48        match self {
49            Self::Remote(dpi) => dpi,
50            Self::Local((_user, dpi)) => dpi,
51        }
52    }
53    pub fn value(&self) -> Option<UserId> {
54        match self {
55            Self::Remote(_) => None,
56            Self::Local((user, _)) => Some(*user),
57        }
58    }
59    pub fn new_from(peer: &DirectPeerId, local_user: &Option<UserId>) -> Self {
60        match local_user {
61            Some(user) => ClientPeerId::Local((*user, *peer)),
62            None => ClientPeerId::Remote(*peer),
63        }
64    }
65}
66
67#[derive(Debug)]
68enum PeerConnection {
69    Core(BindAddress),
70    Client(ConnectionBase),
71    Local(LocalTransport),
72    NONE,
73}
74
75#[derive(Debug)]
76struct LocalTransport {
77    #[allow(dead_code)]
78    client_peer_id: DirectPeerId,
79    client_cnx: ConnectionBase,
80    server_cnx: ConnectionBase,
81}
82
83impl LocalTransport {
84    async fn close(&mut self) {
85        self.client_cnx.close().await;
86        self.server_cnx.close().await;
87    }
88}
89
90#[derive(Debug)]
91struct BrokerPeerInfo {
92    #[allow(dead_code)]
93    last_peer_advert: Option<PeerAdvert>, //FIXME: remove Option
94    connected: PeerConnection,
95}
96
97#[derive(Debug)]
98#[allow(dead_code)]
99struct DirectConnection {
100    addr: BindAddress,
101    remote_peer_id: X25519PrivKey,
102    tp: TransportProtocol,
103    //dir: ConnectionDir,
104    cnx: ConnectionBase,
105}
106
107#[derive(Debug)]
108pub struct ServerConfig {
109    pub overlays_configs: Vec<BrokerOverlayConfigV0>,
110    pub registration: RegistrationConfig,
111    pub admin_user: Option<PubKey>,
112    pub peer_id: PubKey,
113    // when creating invitation links, an optional url to redirect the user to can be used, for accepting ToS and making payment, if any.
114    pub registration_url: Option<String>,
115    pub bootstrap: BootstrapContent,
116}
117
118pub enum LocalBrokerMessage {
119    Deliver {
120        event: Event,
121        overlay: OverlayId,
122        user: UserId,
123    },
124    Disconnected {
125        user_id: UserId,
126    },
127}
128
129pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new())));
130
131pub struct Broker {
132    direct_connections: HashMap<BindAddress, DirectConnection>,
133    /// tuple of optional userId and peer key in montgomery form. userId is always None on the server side (except for local transport).
134    peers: HashMap<(Option<PubKey>, Option<X25519PubKey>), BrokerPeerInfo>,
135    /// (local,remote) -> ConnectionBase
136    anonymous_connections: HashMap<(BindAddress, BindAddress), ConnectionBase>,
137
138    config: Option<ServerConfig>,
139    shutdown: Option<Receiver<ProtocolError>>,
140    shutdown_sender: Sender<ProtocolError>,
141    closing: bool,
142    server_broker: Option<Arc<RwLock<dyn IServerBroker + Send + Sync>>>,
143
144    //local_broker: Option<Box<dyn ILocalBroker + Send + Sync + 'a>>,
145    local_broker: Option<UnboundedSender<LocalBrokerMessage>>,
146
147    #[cfg(not(target_arch = "wasm32"))]
148    listeners: HashMap<String, ListenerInfo>,
149    #[cfg(not(target_arch = "wasm32"))]
150    bind_addresses: HashMap<BindAddress, String>,
151    #[cfg(not(target_arch = "wasm32"))]
152    users_peers: HashMap<UserId, HashSet<Option<X25519PubKey>>>,
153}
154
155impl Broker {
156    // pub fn init_local_broker(
157    //     &mut self,
158    //     base_path: Option<PathBuf>,
159    //     in_memory: bool,
160    // ) -> Result<(), NgError> {
161    //     if in_memory && base_path.is_some() {
162    //         return Err(NgError::InvalidArgument);
163    //     }
164    //     self.base_path = base_path;
165    //     self.in_memory = in_memory;
166    //     Ok(())
167    // }
168
169    // pub fn register_last_seq_function(&mut self, function: Box<LastSeqFn>) {
170    //     if self.last_seq_function.is_none() {
171    //         self.last_seq_function = Some(function);
172    //     }
173    // }
174
175    pub fn get_server_peer_id(&self) -> DirectPeerId {
176        self.config.as_ref().unwrap().peer_id
177    }
178
179    pub(crate) fn get_config(&self) -> Option<&ServerConfig> {
180        self.config.as_ref()
181    }
182
183    pub(crate) fn get_registration_url(&self) -> Option<&String> {
184        self.config
185            .as_ref()
186            .and_then(|c| c.registration_url.as_ref())
187    }
188
189    pub(crate) fn get_bootstrap(&self) -> Result<&BootstrapContent, ProtocolError> {
190        self.config
191            .as_ref()
192            .map(|c| &c.bootstrap)
193            .ok_or(ProtocolError::BrokerError)
194    }
195
196    #[doc(hidden)]
197    pub fn set_server_broker(&mut self, broker: impl IServerBroker + 'static) {
198        //log_debug!("set_server_broker");
199        self.server_broker = Some(Arc::new(RwLock::new(broker)));
200    }
201
202    #[doc(hidden)]
203    pub fn set_local_broker(&mut self, pump: UnboundedSender<LocalBrokerMessage>) {
204        //log_debug!("set_local_broker");
205        self.local_broker = Some(pump);
206    }
207
208    pub fn set_server_config(&mut self, config: ServerConfig) {
209        self.config = Some(config);
210    }
211
212    #[cfg(not(target_arch = "wasm32"))]
213    pub fn set_listeners(
214        &mut self,
215        listeners: HashMap<String, ListenerInfo>,
216    ) -> (HashMap<String, ListenerInfo>, HashMap<BindAddress, String>) {
217        for entry in listeners.iter() {
218            for ba in entry.1.addrs.iter() {
219                self.bind_addresses.insert(ba.clone(), entry.0.clone());
220            }
221        }
222        self.listeners.extend(listeners);
223        let mut copy_listeners: HashMap<String, ListenerInfo> = HashMap::new();
224        let mut copy_bind_addresses: HashMap<BindAddress, String> = HashMap::new();
225        copy_listeners.clone_from(&self.listeners);
226        copy_bind_addresses.clone_from(&self.bind_addresses);
227        (copy_listeners, copy_bind_addresses)
228    }
229
230    #[doc(hidden)]
231    pub fn get_server_broker(
232        &self,
233    ) -> Result<Arc<RwLock<dyn IServerBroker + Send + Sync>>, ProtocolError> {
234        //log_debug!("GET STORAGE {:?}", self.server_storage);
235        Ok(Arc::clone(
236            self.server_broker
237                .as_ref()
238                .ok_or(ProtocolError::BrokerError)?,
239        ))
240    }
241
242    // pub(crate) fn get_server_broker_mut(
243    //     &mut self,
244    // ) -> Result<&mut Box<dyn IServerBroker + Send + Sync>, ProtocolError> {
245    //     //log_debug!("GET STORAGE {:?}", self.server_storage);
246    //     self.server_broker
247    //         .as_mut()
248    //         .ok_or(ProtocolError::BrokerError)
249    // }
250
251    //Option<Arc<RwLock<dyn ILocalBroker>>>,
252    pub(crate) fn get_local_broker(
253        &self,
254    ) -> Result<UnboundedSender<LocalBrokerMessage>, ProtocolError> {
255        Ok(self
256            .local_broker
257            .as_ref()
258            .ok_or(ProtocolError::NoLocalBrokerFound)?
259            .clone())
260    }
261
262    #[cfg(not(target_arch = "wasm32"))]
263    pub(crate) async fn authorize(
264        &self,
265        bind_addresses: &(BindAddress, BindAddress),
266        auth: Authorization,
267    ) -> Result<(), ProtocolError> {
268        let listener_id = self
269            .bind_addresses
270            .get(&bind_addresses.0)
271            .ok_or(ProtocolError::BrokerError)?;
272        let listener = self
273            .listeners
274            .get(listener_id)
275            .ok_or(ProtocolError::BrokerError)?;
276        match auth {
277            Authorization::Discover => {
278                if listener.config.discoverable
279                    && bind_addresses.1.ip.is_private()
280                    && listener.config.accept_forward_for.is_no()
281                {
282                    Ok(())
283                } else {
284                    Err(ProtocolError::AccessDenied)
285                }
286            }
287            Authorization::ExtMessage => Err(ProtocolError::AccessDenied),
288            Authorization::Client(user_and_registration) => {
289                if user_and_registration.1.is_some() {
290                    // user wants to register
291                    let lock = self.get_server_broker()?;
292                    {
293                        let storage = lock.read().await;
294                        if storage.get_user(user_and_registration.0).is_ok() {
295                            return Ok(());
296                        }
297                    }
298                    {
299                        let mut storage = lock.write().await;
300                        if storage.has_no_user()? {
301                            let code = user_and_registration.1.unwrap().unwrap();
302                            let inv_type = storage.get_invitation_type(code)?;
303                            if inv_type == 3u8 {
304                                // it is a setup invite
305                                // TODO send (return here) master_key to client (so they can save it in their wallet)
306                                let _master_key = storage.take_master_key()?;
307                                // TODO save remote_boot (in server.path)
308                                storage.add_user(user_and_registration.0, true)?;
309                                storage.remove_invitation(code)?;
310                                return Ok(());
311                            }
312                            return Err(ProtocolError::InvalidState);
313                        }
314                    }
315                    let storage = lock.read().await;
316                    if let Some(ServerConfig {
317                        registration: reg, ..
318                    }) = &self.config
319                    {
320                        return match reg {
321                            RegistrationConfig::Closed => return Err(ProtocolError::AccessDenied),
322                            RegistrationConfig::Invitation => {
323                                // registration is only possible with an invitation code
324                                if user_and_registration.1.unwrap().is_none() {
325                                    Err(ProtocolError::InvitationRequired)
326                                } else {
327                                    let code = user_and_registration.1.unwrap().unwrap();
328                                    let inv_type = storage.get_invitation_type(code)?;
329                                    storage.add_user(user_and_registration.0, inv_type == 2u8)?;
330                                    storage.remove_invitation(code)?;
331                                    Ok(())
332                                }
333                            }
334                            RegistrationConfig::Open => {
335                                // registration is open (no need for invitation. anybody can register)
336                                let mut is_admin = false;
337                                if user_and_registration.1.unwrap().is_some() {
338                                    // but if there is an invitation code and it says the user should be admin, then we take that into account
339                                    let code = user_and_registration.1.unwrap().unwrap();
340                                    let inv_type = storage.get_invitation_type(code)?;
341                                    if inv_type == 2u8 {
342                                        // admin
343                                        is_admin = true;
344                                        storage.remove_invitation(code)?;
345                                    } else if inv_type == 1u8 {
346                                        storage.remove_invitation(code)?;
347                                    }
348                                }
349                                storage.add_user(user_and_registration.0, is_admin)?;
350                                Ok(())
351                            }
352                        };
353                    } else {
354                        return Err(ProtocolError::BrokerError);
355                    }
356                }
357                // if user doesn't want to register, we accept everything, as perms will be checked later on, once the overlayId is known
358                Ok(())
359            }
360            Authorization::Core => Err(ProtocolError::AccessDenied),
361            Authorization::Admin(admin_user) => {
362                if listener.config.accepts_client() {
363                    if let Some(ServerConfig {
364                        admin_user: Some(admin),
365                        ..
366                    }) = self.config
367                    {
368                        if admin == admin_user {
369                            return Ok(());
370                        }
371                    }
372                    let found = self.get_server_broker()?.read().await.get_user(admin_user);
373                    if found.is_ok() && found.unwrap() {
374                        return Ok(());
375                    }
376                }
377                Err(ProtocolError::AccessDenied)
378            }
379            Authorization::OverlayJoin(_) => Err(ProtocolError::AccessDenied),
380        }
381    }
382
383    fn reconnecting(&mut self, peer_id: X25519PrivKey, user: Option<PubKey>) {
384        let peerinfo = self.peers.get_mut(&(user, Some(peer_id)));
385        match peerinfo {
386            Some(info) => match &info.connected {
387                PeerConnection::NONE => {}
388                PeerConnection::Client(_cb) => {
389                    info.connected = PeerConnection::NONE;
390                }
391                PeerConnection::Core(ip) => {
392                    self.direct_connections.remove(&ip);
393                    info.connected = PeerConnection::NONE;
394                }
395                PeerConnection::Local(_) => {
396                    panic!("local transport connections cannot disconnect. shouldn't reconnect")
397                }
398            },
399            None => {}
400        }
401    }
402
403    async fn remove_peer_id(&mut self, peer_id: X25519PrivKey, user: Option<PubKey>) {
404        self.remove_peer_id_(Some(peer_id), user).await
405    }
406
407    #[allow(dead_code)]
408    async fn remove_local_transport(&mut self, user: PubKey) {
409        self.remove_peer_id_(None, Some(user)).await
410    }
411
412    async fn remove_peer_id_(&mut self, peer_id: Option<X25519PrivKey>, user: Option<PubKey>) {
413        let removed = self.peers.remove(&(user, peer_id));
414        match removed {
415            Some(info) => match info.connected {
416                PeerConnection::NONE => {}
417                PeerConnection::Client(mut _cb) => {
418                    #[cfg(not(target_arch = "wasm32"))]
419                    if user.is_none() {
420                        _cb.release_shutdown();
421                        // server side
422                        if let Some(fsm) = _cb.fsm {
423                            if let Ok(user) = fsm.lock().await.user_id() {
424                                let _ = self
425                                    .remove_user_peer(&user, &Some(peer_id.to_owned().unwrap()));
426                            }
427                        }
428                        let peer = PubKey::X25519PubKey(peer_id.unwrap());
429                        log_debug!("unsubscribing peer {}", peer);
430                        self.get_server_broker()
431                            .unwrap()
432                            .read()
433                            .await
434                            .remove_all_subscriptions_of_client(&ClientPeerId::new_from(
435                                &peer, &user,
436                            ))
437                            .await;
438                    }
439                }
440                PeerConnection::Core(ip) => {
441                    self.direct_connections.remove(&ip);
442                }
443                PeerConnection::Local(_lt) => {
444                    #[cfg(not(target_arch = "wasm32"))]
445                    if peer_id.is_none() && user.is_some() {
446                        // server side
447                        let _ = self.remove_user_peer(user.as_ref().unwrap(), &None);
448
449                        log_debug!("unsubscribing local peer {}", _lt.client_peer_id);
450                        self.get_server_broker()
451                            .unwrap()
452                            .read()
453                            .await
454                            .remove_all_subscriptions_of_client(&ClientPeerId::new_from(
455                                &_lt.client_peer_id,
456                                &user,
457                            ))
458                            .await;
459                    }
460                }
461            },
462            None => {}
463        }
464    }
465
466    #[cfg(not(target_arch = "wasm32"))]
467    fn remove_anonymous(
468        &mut self,
469        remote_bind_address: BindAddress,
470        local_bind_address: BindAddress,
471    ) {
472        let removed = self
473            .anonymous_connections
474            .remove(&(local_bind_address, remote_bind_address));
475        if removed.is_some() {
476            removed.unwrap().release_shutdown();
477        }
478    }
479
480    // #[cfg(not(target_arch = "wasm32"))]
481    // pub fn test_storage(&self, path: PathBuf) {
482    //     use ng_storage_rocksdb::kcv_store::RocksDbKCVStorage;
483
484    //     let key: [u8; 32] = [0; 32];
485    //     let test_storage = RocksDbKCVStorage::open(&path, key);
486    //     match test_storage {
487    //         Err(e) => {
488    //             log_debug!("storage error {}", e);
489    //         }
490    //         Ok(_) => {
491    //             log_debug!("storage ok");
492    //         }
493    //     }
494    // }
495
496    fn new() -> Self {
497        let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<ProtocolError>();
498        let mut random_buf = [0u8; 4];
499        getrandom::getrandom(&mut random_buf).unwrap();
500
501        Broker {
502            anonymous_connections: HashMap::new(),
503            config: None,
504            shutdown: Some(shutdown_receiver),
505            shutdown_sender,
506            direct_connections: HashMap::new(),
507            peers: HashMap::new(),
508            closing: false,
509            server_broker: None,
510            local_broker: None,
511
512            #[cfg(not(target_arch = "wasm32"))]
513            listeners: HashMap::new(),
514            #[cfg(not(target_arch = "wasm32"))]
515            bind_addresses: HashMap::new(),
516            #[cfg(not(target_arch = "wasm32"))]
517            users_peers: HashMap::new(),
518        }
519    }
520
521    fn take_shutdown(&mut self) -> Result<Receiver<ProtocolError>, ProtocolError> {
522        self.shutdown.take().ok_or(ProtocolError::BrokerError)
523    }
524
525    pub async fn join_shutdown() -> Result<(), ProtocolError> {
526        let mut shutdown_join: Receiver<ProtocolError>;
527        {
528            shutdown_join = BROKER.write().await.take_shutdown()?;
529        }
530        match shutdown_join.next().await {
531            Some(ProtocolError::Closing) => Ok(()),
532            Some(error) => Err(error),
533            None => Ok(()),
534        }
535    }
536
537    /// Used in tests mostly
538    pub async fn join_shutdown_with_timeout(
539        timeout: std::time::Duration,
540    ) -> Result<(), ProtocolError> {
541        async fn timer_shutdown(timeout: std::time::Duration) -> ResultSend<()> {
542            async move {
543                sleep!(timeout);
544                log_debug!("timeout for shutdown");
545                let _ = BROKER
546                    .write()
547                    .await
548                    .shutdown_sender
549                    .send(ProtocolError::Timeout)
550                    .await;
551            }
552            .await;
553            Ok(())
554        }
555        spawn_and_log_error(timer_shutdown(timeout));
556        Broker::join_shutdown().await
557    }
558
559    pub async fn graceful_shutdown() {
560        let peer_ids;
561        let anonymous;
562        {
563            let mut broker = BROKER.write().await;
564            if broker.closing {
565                return;
566            }
567            broker.closing = true;
568            peer_ids = Vec::from_iter(broker.peers.keys().cloned());
569            anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned());
570        }
571        for peer_id in peer_ids {
572            BROKER
573                .write()
574                .await
575                .close_peer_connection_x(peer_id.1, peer_id.0)
576                .await;
577        }
578        for anon in anonymous {
579            BROKER.write().await.close_anonymous(anon.1, anon.0).await;
580        }
581        let _ = BROKER
582            .write()
583            .await
584            .shutdown_sender
585            .send(ProtocolError::Closing)
586            .await;
587    }
588
589    pub async fn close_all_connections() {
590        let peer_ids;
591        let anonymous;
592        {
593            let broker = BROKER.write().await;
594            if broker.closing {
595                return;
596            }
597            peer_ids = Vec::from_iter(broker.peers.keys().cloned());
598            anonymous = Vec::from_iter(broker.anonymous_connections.keys().cloned());
599        }
600        for peer_id in peer_ids {
601            if peer_id.1.is_some() {
602                BROKER
603                    .write()
604                    .await
605                    .close_peer_connection_x(peer_id.1, peer_id.0)
606                    .await;
607            }
608        }
609        for anon in anonymous {
610            BROKER.write().await.close_anonymous(anon.1, anon.0).await;
611        }
612    }
613
614    #[allow(dead_code)]
615    #[cfg(not(target_arch = "wasm32"))]
616    async fn shutdown(&mut self) {
617        if self.closing {
618            return;
619        }
620        self.closing = true;
621
622        let _ = self.shutdown_sender.send(ProtocolError::Closing).await;
623    }
624
625    #[doc(hidden)]
626    #[cfg(not(target_arch = "wasm32"))]
627    pub async fn accept(
628        &mut self,
629        mut connection: ConnectionBase,
630        remote_bind_address: BindAddress,
631        local_bind_address: BindAddress,
632    ) -> Result<(), NetError> {
633        if self.closing {
634            return Err(NetError::Closing);
635        }
636
637        let join: mpsc::UnboundedReceiver<Either<NetError, X25519PrivKey>> =
638            connection.take_shutdown();
639        if self
640            .anonymous_connections
641            .insert((local_bind_address, remote_bind_address), connection)
642            .is_some()
643        {
644            log_err!(
645                "internal error. duplicate connection {:?} {:?}",
646                local_bind_address,
647                remote_bind_address
648            );
649        }
650
651        async fn watch_close(
652            mut join: Receiver<Either<NetError, X25519PrivKey>>,
653            remote_bind_address: BindAddress,
654            local_bind_address: BindAddress,
655        ) -> ResultSend<()> {
656            async move {
657                let res = join.next().await;
658                match res {
659                    Some(Either::Right(remote_peer_id)) => {
660                        let _res = join.next().await;
661
662                        // if res.is_some()
663                        //     && res.as_ref().unwrap().as_ref().unwrap_left() == &NetError::Closing
664                        // {
665                        //     return;
666                        // }
667                        log_debug!("SOCKET IS CLOSED {:?} peer_id: {:?}", res, remote_peer_id);
668                        BROKER
669                            .write()
670                            .await
671                            .remove_peer_id(remote_peer_id, None)
672                            .await;
673                    }
674                    _ => {
675                        log_debug!(
676                            "SOCKET IS CLOSED {:?} remote: {:?} local: {:?}",
677                            res,
678                            remote_bind_address,
679                            local_bind_address
680                        );
681                        BROKER
682                            .write()
683                            .await
684                            .remove_anonymous(remote_bind_address, local_bind_address);
685                    }
686                }
687            }
688            .await;
689            Ok(())
690        }
691        spawn_and_log_error(watch_close(join, remote_bind_address, local_bind_address));
692
693        Ok(())
694    }
695
696    #[cfg(not(target_arch = "wasm32"))]
697    fn add_user_peer(
698        &mut self,
699        user: UserId,
700        peer: Option<X25519PrivKey>,
701    ) -> Result<(), ProtocolError> {
702        let peers_set = self
703            .users_peers
704            .entry(user)
705            .or_insert(HashSet::with_capacity(1));
706
707        if !peers_set.insert(peer) {
708            //return Err(ProtocolError::PeerAlreadyConnected);
709        }
710        Ok(())
711    }
712
713    #[cfg(not(target_arch = "wasm32"))]
714    fn remove_user_peer(
715        &mut self,
716        user: &UserId,
717        peer: &Option<X25519PrivKey>,
718    ) -> Result<(), ProtocolError> {
719        let peers_set = self
720            .users_peers
721            .get_mut(user)
722            .ok_or(ProtocolError::UserNotConnected)?;
723
724        if !peers_set.remove(peer) {
725            return Err(ProtocolError::PeerNotConnected);
726        }
727        if peers_set.is_empty() {
728            let _ = self.users_peers.remove(user);
729        }
730        Ok(())
731    }
732
733    #[cfg(not(target_arch = "wasm32"))]
734    pub(crate) async fn attach_and_authorize_app(
735        &mut self,
736        remote_bind_address: BindAddress,
737        local_bind_address: BindAddress,
738        remote_peer_id: X25519PrivKey,
739        user: &Option<UserId>,
740        _info: &ClientInfo,
741    ) -> Result<(), ProtocolError> {
742        let already = self.peers.get(&(None, Some(remote_peer_id)));
743        if already.is_some() {
744            match already.unwrap().connected {
745                PeerConnection::NONE => {}
746                _ => {
747                    return Err(ProtocolError::PeerAlreadyConnected);
748                }
749            };
750        }
751
752        //TODO: check permissions for user/remote_bind_address or headless if no user
753
754        //TODO: keep the info
755
756        let mut connection = self
757            .anonymous_connections
758            .remove(&(local_bind_address, remote_bind_address))
759            .ok_or(ProtocolError::BrokerError)?;
760
761        connection.reset_shutdown(remote_peer_id).await;
762
763        if user.is_some() {
764            self.add_user_peer(user.unwrap(), Some(remote_peer_id))?;
765        }
766
767        let connected = PeerConnection::Client(connection);
768
769        let bpi = BrokerPeerInfo {
770            last_peer_advert: None,
771            connected,
772        };
773        self.peers.insert((None, Some(remote_peer_id)), bpi);
774
775        Ok(())
776    }
777
778    #[cfg(not(target_arch = "wasm32"))]
779    pub(crate) async fn attach_and_authorize_peer_id(
780        &mut self,
781        remote_bind_address: BindAddress,
782        local_bind_address: BindAddress,
783        remote_peer_id: X25519PrivKey,
784        // if client is None it means we are Core mode
785        client: Option<ClientAuthContentV0>,
786        fsm: &mut NoiseFSM,
787    ) -> Result<(), ProtocolError> {
788        log_debug!("ATTACH PEER_ID {:?}", remote_peer_id);
789
790        let already = self.peers.remove(&(None, Some(remote_peer_id)));
791        if already.is_some() {
792            match already.unwrap().connected {
793                PeerConnection::NONE => {}
794                PeerConnection::Client(mut cnx) => {
795                    cnx.close_silently().await;
796                }
797                _ => {}
798            };
799        }
800
801        // find the listener
802        let listener_id = self
803            .bind_addresses
804            .get(&local_bind_address)
805            .ok_or(ProtocolError::AccessDenied)?;
806        let listener = self
807            .listeners
808            .get(listener_id)
809            .ok_or(ProtocolError::AccessDenied)?;
810
811        // authorize
812        let is_core = if client.is_none() {
813            // it is a Core connection
814            if !listener.config.is_core() {
815                return Err(ProtocolError::AccessDenied);
816            }
817            true
818        } else {
819            if !listener.config.accepts_client() {
820                return Err(ProtocolError::AccessDenied);
821            }
822            let client = client.as_ref().unwrap();
823            self.authorize(
824                &(local_bind_address, remote_bind_address),
825                Authorization::Client((client.user.clone(), client.registration.clone())),
826            )
827            .await?;
828
829            // TODO add client to storage
830            false
831        };
832
833        let mut connection = self
834            .anonymous_connections
835            .remove(&(local_bind_address, remote_bind_address))
836            .ok_or(ProtocolError::BrokerError)?;
837
838        connection.reset_shutdown(remote_peer_id).await;
839        let connected = if !is_core {
840            let user = client.unwrap().user;
841            fsm.set_user_id(user);
842            self.add_user_peer(user, Some(remote_peer_id))?;
843
844            PeerConnection::Client(connection)
845        } else {
846            let dc = DirectConnection {
847                addr: remote_bind_address,
848                remote_peer_id,
849                tp: connection.transport_protocol(),
850                cnx: connection,
851            };
852            self.direct_connections.insert(remote_bind_address, dc);
853            PeerConnection::Core(remote_bind_address)
854        };
855        let bpi = BrokerPeerInfo {
856            last_peer_advert: None,
857            connected,
858        };
859        self.peers.insert((None, Some(remote_peer_id)), bpi);
860
861        Ok(())
862    }
863
864    pub async fn probe(
865        &mut self,
866        cnx: Box<dyn IConnect>,
867        ip: IP,
868        port: u16,
869    ) -> Result<Option<PubKey>, ProtocolError> {
870        if self.closing {
871            return Err(ProtocolError::Closing);
872        }
873        cnx.probe(ip, port).await
874    }
875
876    pub async fn admin<
877        A: Into<ProtocolMessage>
878            + Into<AdminRequestContentV0>
879            + std::fmt::Debug
880            + Sync
881            + Send
882            + 'static,
883    >(
884        &mut self,
885        cnx: Box<dyn IConnect>,
886        peer_privk: PrivKey,
887        peer_pubk: PubKey,
888        remote_peer_id: DirectPeerId,
889        user: PubKey,
890        user_priv: PrivKey,
891        addr: BindAddress,
892        request: A,
893    ) -> Result<AdminResponseContentV0, ProtocolError> {
894        let config = StartConfig::Admin(AdminConfig {
895            user,
896            user_priv,
897            addr,
898            request: request.into(),
899        });
900        let remote_peer_id_dh = remote_peer_id.to_dh_from_ed();
901
902        let mut connection = cnx
903            .open(
904                config.get_url(),
905                peer_privk.clone(),
906                peer_pubk,
907                remote_peer_id_dh,
908                config.clone(),
909            )
910            .await?;
911
912        connection.admin::<A>().await
913    }
914
915    pub async fn ext<
916        A: Into<ProtocolMessage> + Into<ExtRequestContentV0> + std::fmt::Debug + Sync + Send + 'static,
917        B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + Send + 'static,
918    >(
919        cnx: Box<dyn IConnect>,
920        peer_privk: PrivKey,
921        peer_pubk: PubKey,
922        remote_peer_id: DirectPeerId,
923        url: String,
924        request: A,
925    ) -> Result<B, NgError> {
926        let config = StartConfig::Ext(ExtConfig {
927            url,
928            request: request.into(),
929        });
930        let remote_peer_id_dh = remote_peer_id.to_dh_from_ed();
931        let mut connection = cnx
932            .open(
933                config.get_url(),
934                peer_privk.clone(),
935                peer_pubk,
936                remote_peer_id_dh,
937                config.clone(),
938            )
939            .await?;
940        connection.ext::<A, B>().await
941    }
942
943    #[doc(hidden)]
944    pub fn connect_local(&mut self, peer_pubk: PubKey, user: UserId) -> Result<(), ProtocolError> {
945        if self.closing {
946            return Err(ProtocolError::Closing);
947        }
948
949        let (client_cnx, server_cnx) = ConnectionBase::create_local_transport_pipe(user, peer_pubk);
950
951        let bpi = BrokerPeerInfo {
952            last_peer_advert: None,
953            connected: PeerConnection::Local(LocalTransport {
954                client_peer_id: peer_pubk,
955                client_cnx,
956                server_cnx,
957            }),
958        };
959
960        self.peers.insert((Some(user), None), bpi);
961        Ok(())
962    }
963
964    pub async fn connect(
965        &mut self,
966        cnx: Arc<Box<dyn IConnect>>,
967        peer_privk: PrivKey,
968        peer_pubk: PubKey,
969        remote_peer_id: DirectPeerId,
970        config: StartConfig,
971    ) -> Result<(), ProtocolError> {
972        if self.closing {
973            return Err(ProtocolError::Closing);
974        }
975
976        log_debug!("CONNECTING");
977        let remote_peer_id_dh = remote_peer_id.to_dh_from_ed();
978
979        // checking if already connected
980        if config.is_keep_alive() {
981            let already = self
982                .peers
983                .get(&(config.get_user(), Some(*remote_peer_id_dh.slice())));
984            if already.is_some() {
985                match already.unwrap().connected {
986                    PeerConnection::NONE => {}
987                    _ => {
988                        return Err(ProtocolError::PeerAlreadyConnected);
989                    }
990                };
991            }
992            //TODO, if Core, check that IP is not in self.direct_connections
993        }
994
995        let mut connection = cnx
996            .open(
997                config.get_url(),
998                peer_privk.clone(),
999                peer_pubk,
1000                remote_peer_id_dh,
1001                config.clone(),
1002            )
1003            .await?;
1004
1005        if !config.is_keep_alive() {
1006            return Ok(());
1007        }
1008
1009        let join = connection.take_shutdown();
1010
1011        let connected = match &config {
1012            StartConfig::Core(config) => {
1013                let dc = DirectConnection {
1014                    addr: config.addr,
1015                    remote_peer_id: *remote_peer_id_dh.slice(),
1016                    tp: connection.transport_protocol(),
1017                    cnx: connection,
1018                };
1019                self.direct_connections.insert(config.addr, dc);
1020                PeerConnection::Core(config.addr)
1021            }
1022            StartConfig::Client(_) | StartConfig::App(_) => PeerConnection::Client(connection),
1023            _ => unimplemented!(),
1024        };
1025
1026        let bpi = BrokerPeerInfo {
1027            last_peer_advert: None,
1028            connected,
1029        };
1030
1031        self.peers
1032            .insert((config.get_user(), Some(*remote_peer_id_dh.slice())), bpi);
1033
1034        async fn watch_close(
1035            mut join: Receiver<Either<NetError, X25519PrivKey>>,
1036            _cnx: Arc<Box<dyn IConnect>>,
1037            _peer_privk: PrivKey,
1038            _peer_pubkey: PubKey,
1039            remote_peer_id: [u8; 32],
1040            config: StartConfig,
1041            mut local_broker: UnboundedSender<LocalBrokerMessage>,
1042        ) -> ResultSend<()> {
1043            async move {
1044                let res = join.next().await;
1045                log_info!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id);
1046                if res.is_some()
1047                    && res.as_ref().unwrap().is_left()
1048                    && res.unwrap().unwrap_left() != NetError::Closing
1049                {
1050                    // we intend to reconnect
1051                    let mut broker = BROKER.write().await;
1052                    broker.reconnecting(remote_peer_id, config.get_user());
1053                    // TODO: deal with cycle error https://users.rust-lang.org/t/recursive-async-method-causes-cycle-error/84628/5
1054                    // there is async_recursion now. use that
1055                    // use a channel and send the reconnect job to it.
1056                    // create a spawned loop to read the channel and process the reconnection requests.
1057                    // let result = broker
1058                    //     .connect(cnx, ip, core, peer_pubk, peer_privk, remote_peer_id)
1059                    //     .await;
1060                    // log_debug!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id);
1061                    // TODO: deal with error and incremental backoff
1062
1063                    // TODO: incremental reconnections: after 5sec, +10sec, +20sec, +30sec
1064
1065                    // if all attempts fail :
1066                    if let Some(user) = config.get_user() {
1067                        let _ = local_broker
1068                            .send(LocalBrokerMessage::Disconnected { user_id: user })
1069                            .await;
1070                    }
1071                } else {
1072                    log_debug!("REMOVED");
1073                    BROKER
1074                        .write()
1075                        .await
1076                        .remove_peer_id(remote_peer_id, config.get_user())
1077                        .await;
1078                }
1079            }
1080            .await;
1081            Ok(())
1082        }
1083        spawn_and_log_error(watch_close(
1084            join,
1085            cnx,
1086            peer_privk,
1087            peer_pubk,
1088            *remote_peer_id_dh.slice(),
1089            config,
1090            self.get_local_broker()?,
1091        ));
1092        Ok(())
1093    }
1094
1095    pub async fn request<
1096        A: Into<ProtocolMessage> + std::fmt::Debug + Sync + Send + 'static,
1097        B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + Send + 'static,
1098    >(
1099        &self,
1100        user: &Option<UserId>,
1101        remote_peer_id: &Option<DirectPeerId>, // None means local
1102        msg: A,
1103    ) -> Result<SoS<B>, NgError> {
1104        let bpi = self
1105            .peers
1106            .get(&(*user, remote_peer_id.map(|rpi| rpi.to_dh_slice())))
1107            .ok_or(NgError::ConnectionNotFound)?;
1108        match &bpi.connected {
1109            PeerConnection::Client(cnx) => cnx.request(msg).await,
1110            PeerConnection::Local(lt) => lt.client_cnx.request(msg).await,
1111            _ => Err(NgError::BrokerError),
1112        }
1113    }
1114
1115    #[cfg(not(target_arch = "wasm32"))]
1116    fn get_fsm_for_client(&self, client: &ClientPeerId) -> Option<Arc<Mutex<NoiseFSM>>> {
1117        match client {
1118            ClientPeerId::Local((user, _)) => {
1119                if let Some(BrokerPeerInfo {
1120                    connected:
1121                        PeerConnection::Local(LocalTransport {
1122                            server_cnx: ConnectionBase { fsm: Some(fsm), .. },
1123                            ..
1124                        }),
1125                    ..
1126                }) = self.peers.get(&(Some(*user), None))
1127                {
1128                    Some(Arc::clone(fsm))
1129                } else {
1130                    None
1131                }
1132            }
1133            ClientPeerId::Remote(peer) => {
1134                if let Some(BrokerPeerInfo {
1135                    connected: PeerConnection::Client(ConnectionBase { fsm: Some(fsm), .. }),
1136                    ..
1137                }) = self.peers.get(&(None, Some(peer.to_dh())))
1138                {
1139                    Some(Arc::clone(fsm))
1140                } else {
1141                    None
1142                }
1143            }
1144        }
1145    }
1146
1147    #[cfg(not(target_arch = "wasm32"))]
1148    pub(crate) async fn dispatch_event(
1149        &self,
1150        overlay: &OverlayId,
1151        event: Event,
1152        user_id: &UserId,
1153        remote_peer: &PubKey,
1154    ) -> Result<Vec<ClientPeerId>, ServerError> {
1155        // TODO: deal with subscriptions on the outer overlay. for now we assume everything is on the inner overlay
1156
1157        let mut clients_to_remove = vec![];
1158
1159        let peers_for_local_dispatch = {
1160            self.get_server_broker()?
1161                .read()
1162                .await
1163                .dispatch_event(overlay, event.clone(), user_id, remote_peer)
1164                .await?
1165        };
1166
1167        for client in peers_for_local_dispatch {
1168            log_debug!("dispatch_event peer {:?}", client);
1169            if let Some(fsm) = self.get_fsm_for_client(&client) {
1170                log_debug!("ForwardedEvent peer {:?}", client);
1171                let _ = fsm
1172                    .lock()
1173                    .await
1174                    .send(ProtocolMessage::ClientMessage(ClientMessage::V0(
1175                        ClientMessageV0 {
1176                            overlay: *overlay,
1177                            padding: vec![],
1178                            content: ClientMessageContentV0::ForwardedEvent(event.clone()),
1179                        },
1180                    )))
1181                    .await;
1182            } else {
1183                // we remove the peer from all local_subscriptions
1184                clients_to_remove.push(client);
1185            }
1186        }
1187
1188        Ok(clients_to_remove)
1189    }
1190
1191    #[doc(hidden)]
1192    pub async fn close_peer_connection_x(
1193        &mut self,
1194        peer_id: Option<X25519PubKey>,
1195        user: Option<PubKey>,
1196    ) {
1197        if let Some(peer) = self.peers.get_mut(&(user, peer_id)) {
1198            match &mut peer.connected {
1199                PeerConnection::Core(_) => {
1200                    //TODO
1201                    unimplemented!();
1202                }
1203                PeerConnection::Client(cb) => {
1204                    cb.close().await;
1205                }
1206                PeerConnection::NONE => {}
1207                PeerConnection::Local(lt) => {
1208                    assert!(peer_id.is_none());
1209                    assert!(user.is_some());
1210                    lt.close().await;
1211                    if self.peers.remove(&(user, None)).is_some() {
1212                        log_debug!(
1213                            "Local transport connection closed ! {}",
1214                            user.unwrap().to_string()
1215                        );
1216                    }
1217                }
1218            }
1219            //self.peers.remove(peer_id); // this is done in the watch_close instead
1220        }
1221    }
1222
1223    pub async fn close_peer_connection(&mut self, peer_id: &DirectPeerId, user: Option<PubKey>) {
1224        self.close_peer_connection_x(Some(peer_id.to_dh_slice()), user)
1225            .await
1226    }
1227
1228    async fn close_anonymous(
1229        &mut self,
1230        remote_bind_address: BindAddress,
1231        local_bind_address: BindAddress,
1232    ) {
1233        if let Some(cb) = self
1234            .anonymous_connections
1235            .get_mut(&(local_bind_address, remote_bind_address))
1236        {
1237            cb.close().await;
1238        }
1239    }
1240
1241    #[doc(hidden)]
1242    pub fn print_status(&self) {
1243        self.peers.iter().for_each(|(peer_id, peer_info)| {
1244            log_info!("PEER in BROKER {:?} {:?}", peer_id, peer_info);
1245        });
1246        self.direct_connections.iter().for_each(|(ip, direct_cnx)| {
1247            log_info!("direct_connection in BROKER {:?} {:?}", ip, direct_cnx);
1248        });
1249        self.anonymous_connections.iter().for_each(|(binds, cb)| {
1250            log_info!(
1251                "ANONYMOUS remote {:?} local {:?} {:?}",
1252                binds.1,
1253                binds.0,
1254                cb
1255            );
1256        });
1257    }
1258}