dittolive-ditto 3.0.0-alpha2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use std::{any::Any, collections::HashMap, sync::Arc};

use ffi_sdk::{ffi_utils::char_p, BoxedDitto};

use crate::{
    identity::SharedIdentity,
    utils::prelude::{DittoError, ErrorKind},
};

use super::{
    sync_state::SyncState,
    transport_config::{LanConfig, TransportConfig},
};

/// Handles for the various Transports
/// operating on the other side of the FFI interface
/// In other SDK, this struct is named *Sync*.
/// To avoid name conflict with ::Sync
/// *Transport* was prefixed.
pub struct TransportSync {
    ditto: Arc<BoxedDitto>,
    /// The effective state currently in use. Initially, it is the default empty TransportConfig.
    effective_state: SyncState,
    /// Parameters from which the effective state is computed.
    requested_state: SyncState,
    pub(crate) tcp_clients: HashMap<String, Box<dyn Any + Send + Sync>>,
    pub(crate) ws_clients: HashMap<String, Box<dyn Any + Send + Sync>>,
    pub(crate) mdns_client_transport: Option<Box<dyn Any + Send + Sync>>,
    pub(crate) mdns_server_advertiser: Option<Box<dyn Any + Send + Sync>>,
    pub(crate) ble_client_transport: Option<Box<dyn Any + Send + Sync>>,
    pub(crate) ble_server_transport: Option<Box<dyn Any + Send + Sync>>,
}

// We make all transport-modifying methods here mutable since either the main
// executable thread or an automated process like the ValidityListener may try
// to change the transports at any time. This forces both to first acquire a
// WriteLock on the Transports struct before mutating the current state of the
// Transports or TransportConfig
impl TransportSync {
    pub(crate) fn from_config(
        config: TransportConfig,
        ditto: Arc<BoxedDitto>,
        identity: SharedIdentity,
    ) -> TransportSync {
        let requested_state = SyncState::new(config, identity.clone());
        let effective_state = SyncState::new(TransportConfig::new(), identity);
        TransportSync {
            ditto,
            requested_state,
            effective_state,
            tcp_clients: HashMap::with_capacity(0), // no need to allocate until actually stateured
            ws_clients: HashMap::with_capacity(0),
            mdns_client_transport: None,
            mdns_server_advertiser: None,
            ble_client_transport: None,
            ble_server_transport: None,
        }
    }

    pub(crate) fn start_sync(&mut self) {
        self.requested_state.set_sync(true);
        self.update()
    }

    pub(crate) fn stop_sync(&mut self) {
        self.requested_state.set_sync(false);
        self.update()
    }

    pub(crate) fn validity_updated(&mut self, web_valid: bool, x509_valid: bool) {
        self.requested_state
            .set_web_valid(web_valid)
            .set_x509_valid(x509_valid);
        self.update()
    }

    pub(crate) fn set_transport_config(&mut self, state: TransportConfig) {
        self.requested_state.set_config(state);
        self.update();
    }

    /// Update Transport layer using provided parameters
    /// Afterward, it also update the effective stateuration
    /// and applies changes
    fn update(&mut self) {
        let future_state = self.requested_state.compute_effective_state();
        let old_state = self.effective_state.clone();
        self.apply_transport_state(&future_state, &old_state);
        self.apply_transport_global_state(&future_state, &old_state);
        self.effective_state = future_state;
    }

    fn apply_transport_state(&mut self, state: &SyncState, old_state: &SyncState) {
        // Diff the two states transport-by-transport
        // TCP and LAN are interrelated
        self.update_peer_to_peer_bluetooth_le(state, old_state);
        self.update_peer_to_peer_lan(state, old_state);
        self.update_listen_tcp(state, old_state);
        self.update_listen_http(state, old_state);
        self.update_connect_tcp_servers(state, old_state);
        self.update_connect_websocket_url(state, old_state);
    }

    /// Return the user provided state
    pub(crate) fn current_config(&self) -> &TransportConfig {
        self.requested_state.config()
    }

    /// Return the effective state.
    #[cfg(test)]
    pub(crate) fn effective_config(&self) -> &TransportConfig {
        self.effective_state.config()
    }

    fn apply_transport_global_state(&self, state: &SyncState, old_state: &SyncState) {
        let new_sync_group = state.config().global.sync_group;
        let old_sync_group = old_state.config().global.sync_group;

        if new_sync_group != old_sync_group {
            unsafe { ffi_sdk::ditto_set_sync_group(&self.ditto, new_sync_group) };
        }
    }

    /// Start a TCP Server which can listen for connections from other Peers
    /// Generally also requires a update to the relevant DNS zone file to be
    /// discoverable by other peers
    fn start_tcp_listen(&mut self, config: &crate::transport::TcpListenConfig) {
        let bind_ip = format!("{}:{}", config.interface_ip, config.port);
        let c_addr = char_p::new(bind_ip);
        // Convert to a public Error type
        let _result =
            unsafe { ffi_sdk::ditto_start_tcp_server(&self.ditto, Some(c_addr.as_ref())) };
    }

    fn stop_tcp_listen(&mut self) {
        unsafe { ffi_sdk::ditto_stop_tcp_server(&self.ditto) };
    }

    /// Starts an HTTP server that other devices will be able to connect to.
    /// * `bind_ip` - IP Address:Port on which to listen for connections
    /// * `enable_websocket` - If true, enable websocket transport
    /// * `static_path` - Optional root of web content directory
    /// * `tls_cert_path` - Optional x509 certificate for web server (also
    ///   requires key)
    /// * `tls_key_path` - Optional TLS private key, required if cert is
    ///   provided
    fn start_http_listen(
        &mut self,
        config: &crate::transport::HttpListenConfig,
    ) -> Result<(), DittoError> {
        let enable_ws = if config.websocket_sync {
            ffi_sdk::WebSocketMode::Enabled
        } else {
            ffi_sdk::WebSocketMode::Disabled
        };

        let bind_ip = format!("{}:{}", config.interface_ip, config.port);
        let c_addr = char_p::new(bind_ip);
        let c_static_path = config
            .static_content_path
            .as_ref()
            .map(|x| char_p::new(x.to_string_lossy().to_string()));
        let c_tls_cert_path = config
            .tls_certificate_path
            .as_ref()
            .map(|x| char_p::new(x.to_string_lossy().to_string()));
        let c_tls_key_path = config
            .tls_key_path
            .as_ref()
            .map(|x| char_p::new(x.to_string_lossy().to_string()));

        let status = unsafe {
            ffi_sdk::ditto_start_http_server(
                &self.ditto,
                Some(c_addr.as_ref()),
                c_static_path.as_ref().map(|x| x.as_ref()),
                enable_ws,
                c_tls_cert_path.as_ref().map(|x| x.as_ref()), // TLS cert path
                c_tls_key_path.as_ref().map(|x| x.as_ref()),  // TLS key path
            )
        };
        if status != 0 {
            Err(DittoError::from_ffi(ErrorKind::InvalidInput))
        } else {
            Ok(())
        }
    }

    fn stop_http_listen(&mut self) {
        unsafe { ffi_sdk::ditto_stop_http_server(&self.ditto) };
    }

    fn start_tcp_connect(&mut self, address: String) {
        let addr = char_p::new(address.clone());
        // this handle stores a tx entangled with an rx across the FFI boundary which
        // will drop if all tx elements drop
        let tcp_client_handle =
            unsafe { ffi_sdk::ditto_add_static_tcp_client(&self.ditto, addr.as_ref()) };
        ::log::info!("Static TCP client transport {:?} started", &address);
        self.tcp_clients
            .insert(address, Box::new(tcp_client_handle));
    }

    fn stop_tcp_connect(&mut self, address: &str) {
        let _ = self.tcp_clients.remove(address);
    }

    fn start_ws_connect(&mut self, url: String) {
        let c_url = char_p::new(url.clone());
        let ws_client_handle =
            unsafe { ffi_sdk::ditto_add_websocket_client(&self.ditto, c_url.as_ref()) };
        ::log::info!("Websocket client transport {:?} started", &url);
        self.ws_clients.insert(url, Box::new(ws_client_handle));
    }

    fn stop_ws_connect(&mut self, url: &str) {
        let _ = self.ws_clients.remove(url);
    }

    fn start_bluetooth(&mut self) {
        // BlueZ cfg guard
        #[cfg(any(
            all(target_os = "linux", not(target_env = "musl")),
            target_os = "windows"
        ))]
        {
            let ble_client_handle =
                unsafe { ffi_sdk::ditto_add_internal_ble_client_transport(&self.ditto) };
            ::log::info!("BLE client transport started");
            self.ble_client_transport = Some(Box::new(ble_client_handle));
            let ble_server_handle =
                unsafe { ffi_sdk::ditto_add_internal_ble_server_transport(&self.ditto) };
            ::log::info!("BLE server transport started");
            self.ble_server_transport = Some(Box::new(ble_server_handle));
        }
        ::log::info!("handling BLE transport")
    }

    fn stop_bluetooth(&mut self) {
        let _to_drop = self.ble_client_transport.take();
        let _to_drop = self.ble_server_transport.take();
    }

    fn start_lan(&mut self, config: &LanConfig, lan_controls_server: bool) {
        unsafe {
            if lan_controls_server {
                ffi_sdk::ditto_start_tcp_server(&self.ditto, None);
            }

            if config.mdns_enabled {
                #[cfg(any(
                    all(target_os = "linux", not(target_env = "musl")),
                    target_os = "windows"
                ))]
                {
                    let client_handle = ffi_sdk::ditto_add_internal_mdns_transport(&self.ditto);
                    ::log::info!("Mdns client transport started");
                    self.mdns_client_transport = Some(Box::new(client_handle));

                    let server_handle = ffi_sdk::ditto_add_internal_mdns_advertiser(&self.ditto);
                    ::log::info!("Mdns advertiser started");
                    self.mdns_server_advertiser = Some(Box::new(server_handle));
                }
            }

            if config.multicast_enabled {
                ffi_sdk::ditto_add_multicast_transport(&self.ditto);
            }
        }
    }

    fn stop_lan(&mut self, lan_controls_server: bool) {
        unsafe {
            if lan_controls_server {
                ffi_sdk::ditto_stop_tcp_server(&self.ditto);
            }

            let _drop_mdns_client = self.mdns_client_transport.take();
            let _drop_mdns_server = self.mdns_server_advertiser.take();
            ffi_sdk::ditto_remove_multicast_transport(&self.ditto);
        }
    }
}

/// This impl contains update for different transport protocols
impl TransportSync {
    fn update_peer_to_peer_lan(&mut self, state: &SyncState, old_state: &SyncState) {
        if state.config().listen.tcp != old_state.config().listen.tcp
            || state.config().peer_to_peer.lan != old_state.config().peer_to_peer.lan
        {
            let lan_stops_server = !old_state.config().listen.tcp.enabled;
            self.stop_lan(lan_stops_server);
            if state.config().peer_to_peer.lan.enabled {
                let lan_starts_server = !state.config().listen.tcp.enabled;
                self.start_lan(&state.config().peer_to_peer.lan, lan_starts_server);
            }
        }
    }

    fn update_listen_tcp(&mut self, state: &SyncState, old_state: &SyncState) {
        if state.config().listen.tcp != old_state.config().listen.tcp {
            self.stop_tcp_listen();
            if state.config().listen.tcp.enabled {
                self.start_tcp_listen(&state.config().listen.tcp);
            }
        }
    }

    fn update_listen_http(&mut self, state: &SyncState, old_state: &SyncState) {
        if state.config().listen.http != old_state.config().listen.http {
            self.stop_http_listen();
            if state.config().listen.http.enabled {
                let _ = self.start_http_listen(&state.config().listen.http);
            }
        }
    }

    fn update_connect_tcp_servers(&mut self, state: &SyncState, old_state: &SyncState) {
        let tcp_connects_to_stop = old_state
            .config()
            .connect
            .tcp_servers
            .difference(&state.config().connect.tcp_servers);
        for addr in tcp_connects_to_stop {
            self.stop_tcp_connect(addr);
        }

        let tcp_connects_to_start = state
            .config()
            .connect
            .tcp_servers
            .difference(&old_state.config().connect.tcp_servers);
        for addr in tcp_connects_to_start {
            self.start_tcp_connect(addr.clone());
        }
    }

    fn update_peer_to_peer_bluetooth_le(&mut self, state: &SyncState, old_state: &SyncState) {
        let new_ble_enabled = state.config().peer_to_peer.bluetooth_le.enabled;
        let old_ble_enabled = old_state.config().peer_to_peer.bluetooth_le.enabled;
        if old_ble_enabled && !new_ble_enabled {
            self.stop_bluetooth();
        }
        if new_ble_enabled && !old_ble_enabled {
            self.start_bluetooth();
        }
    }

    fn update_connect_websocket_url(&mut self, state: &SyncState, old_state: &SyncState) {
        let ws_connects_to_stop = old_state
            .config()
            .connect
            .websocket_urls
            .difference(&state.config().connect.websocket_urls);
        for url in ws_connects_to_stop {
            self.stop_ws_connect(url);
        }

        let ws_connects_to_start = state
            .config()
            .connect
            .websocket_urls
            .difference(&old_state.config().connect.websocket_urls);
        for url in ws_connects_to_start {
            self.start_ws_connect(url.clone());
        }
    }
}