dittolive-ditto 4.9.2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use std::{any::Any, collections::HashMap, ffi::c_void, sync::Arc};

use ffi_sdk::{ffi_utils::char_p, BoxedDitto, DittoSdkTransportsError};
use safer_ffi::prelude::*;
use tracing::info;

use super::{
    sync_state::SyncState,
    transport_config::{LanConfig, TransportConfig},
};
use crate::{
    identity::SharedIdentity,
    utils::prelude::{DittoError, ErrorKind},
};

/// Handles for the various Transports
/// operating on the other side of the FFI interface
/// In other SDK, this struct is named *Sync*.
/// To avoid name conflict with ::Sync
/// *Transport* was prefixed.
pub struct TransportSync {
    ditto: Arc<BoxedDitto>,
    /// The effective state currently in use. Initially, it is the default empty TransportConfig.
    effective_state: SyncState,
    /// Parameters from which the effective state is computed.
    requested_state: SyncState,
    pub(crate) tcp_clients: HashMap<String, Box<dyn Any + Send + Sync>>,
    pub(crate) ws_clients: HashMap<String, Box<dyn Any + Send + Sync>>,
    mdns_transport_handle: Option<MdnsTransportHandleHolder>,
    ble_transport_handle: Option<BleTransportHandleHolder>,
}

struct MdnsTransportHandleHolder(*mut c_void);
unsafe impl Send for MdnsTransportHandleHolder {}
unsafe impl Sync for MdnsTransportHandleHolder {}

struct BleTransportHandleHolder(*mut c_void);
unsafe impl Send for BleTransportHandleHolder {}
unsafe impl Sync for BleTransportHandleHolder {}

// We make all transport-modifying methods here mutable since either the main
// executable thread or an automated process like the ValidityListener may try
// to change the transports at any time. This forces both to first acquire a
// WriteLock on the Transports struct before mutating the current state of the
// Transports or TransportConfig
impl TransportSync {
    pub(crate) fn from_config(
        config: TransportConfig,
        ditto: Arc<BoxedDitto>,
        identity: SharedIdentity,
    ) -> TransportSync {
        let web_valid = ffi_sdk::ditto_auth_client_is_web_valid(&ditto) != 0;
        let x509_valid = ffi_sdk::ditto_auth_client_is_x509_valid(&ditto) != 0;
        let requested_state = SyncState::new(config, identity.clone(), web_valid, x509_valid);
        let effective_state =
            SyncState::new(TransportConfig::new(), identity, web_valid, x509_valid);

        let mut error = DittoSdkTransportsError::None;
        unsafe { ffi_sdk::ditto_sdk_transports_init((&mut error).into()) };
        if error != DittoSdkTransportsError::None {
            tracing::error!(?error, "failed to perform transports init");
        }

        TransportSync {
            ditto,
            requested_state,
            effective_state,
            tcp_clients: HashMap::with_capacity(0), // no need to allocate until actually started
            ws_clients: HashMap::with_capacity(0),
            mdns_transport_handle: None,
            ble_transport_handle: None,
        }
    }

    pub(crate) fn start_sync(&mut self) {
        self.requested_state.set_sync(true);
        self.update()
    }

    pub(crate) fn stop_sync(&mut self) {
        self.requested_state.set_sync(false);
        self.update()
    }

    pub(crate) fn validity_updated(&mut self, web_valid: bool, x509_valid: bool) {
        self.requested_state
            .set_web_valid(web_valid)
            .set_x509_valid(x509_valid);
        self.update()
    }

    pub(crate) fn set_transport_config(&mut self, state: TransportConfig) {
        let config_cbor =
            serde_cbor::to_vec(&state).expect("TransportConfig can be serialized to CBOR");
        self.requested_state.set_config(state);
        self.update();
        ffi_sdk::ditto_small_peer_info_set_transport_config_data(
            &self.ditto,
            config_cbor.as_slice().into(),
        );
    }

    /// Update Transport layer using provided parameters.
    ///
    /// Afterwards, it also updates the effective state and applies changes.
    fn update(&mut self) {
        let future_state = self.requested_state.compute_effective_state();
        let old_state = self.effective_state.clone();
        self.apply_transport_state(&future_state, &old_state);
        self.apply_transport_global_state(&future_state, &old_state);
        self.effective_state = future_state;
    }

    fn apply_transport_state(&mut self, state: &SyncState, old_state: &SyncState) {
        // Diff the two states transport-by-transport
        // TCP and LAN are interrelated
        self.update_peer_to_peer_bluetooth_le(state, old_state);
        self.update_peer_to_peer_lan(state, old_state);
        self.update_listen_tcp(state, old_state);
        self.update_listen_http(state, old_state);
        self.update_connect_tcp_servers(state, old_state);
        self.update_connect_websocket_url(state, old_state);
        self.update_connect_retry_interval(state, old_state);
    }

    /// Return the user provided config.
    pub(crate) fn current_config(&self) -> &TransportConfig {
        self.requested_state.config()
    }

    /// Return the effective config.
    #[cfg(test)]
    pub(crate) fn effective_config(&self) -> &TransportConfig {
        self.effective_state.config()
    }

    fn apply_transport_global_state(&self, state: &SyncState, old_state: &SyncState) {
        let new_sync_group = state.config().global.sync_group;
        let old_sync_group = old_state.config().global.sync_group;

        if new_sync_group != old_sync_group {
            ffi_sdk::ditto_set_sync_group(&self.ditto, new_sync_group);
        }
    }

    /// Start a TCP Server which can listen for connections from other Peers
    /// Generally also requires a update to the relevant DNS zone file to be
    /// discoverable by other peers
    fn start_tcp_listen(&mut self, config: &crate::transport::TcpListenConfig) {
        let bind_ip = format!("{}:{}", config.interface_ip, config.port);
        let c_addr = char_p::new(bind_ip);
        // Convert to a public Error type
        let _result = ffi_sdk::ditto_start_tcp_server(&self.ditto, Some(c_addr.as_ref()));
    }

    fn stop_tcp_listen(&mut self) {
        ffi_sdk::ditto_stop_tcp_server(&self.ditto);
    }

    /// Starts an HTTP server that other devices will be able to connect to.
    /// * `bind_ip` - IP Address:Port on which to listen for connections
    /// * `enable_websocket` - If true, enable websocket transport
    /// * `static_path` - Optional root of web content directory
    /// * `tls_cert_path` - Optional x509 certificate for web server (also requires key)
    /// * `tls_key_path` - Optional TLS private key, required if cert is provided
    fn start_http_listen(
        &mut self,
        config: &crate::transport::HttpListenConfig,
    ) -> Result<(), DittoError> {
        let enable_ws = if config.websocket_sync {
            ffi_sdk::WebSocketMode::Enabled
        } else {
            ffi_sdk::WebSocketMode::Disabled
        };

        let bind_ip = format!("{}:{}", config.interface_ip, config.port);
        let c_addr = char_p::new(bind_ip);
        let c_static_path = config
            .static_content_path
            .as_ref()
            .map(|x| char_p::new(x.to_string_lossy().to_string()));
        let c_tls_cert_path = config
            .tls_certificate_path
            .as_ref()
            .map(|x| char_p::new(x.to_string_lossy().to_string()));
        let c_tls_key_path = config
            .tls_key_path
            .as_ref()
            .map(|x| char_p::new(x.to_string_lossy().to_string()));

        let status = {
            ffi_sdk::ditto_start_http_server(
                &self.ditto,
                Some(c_addr.as_ref()),
                c_static_path.as_ref().map(|x| x.as_ref()),
                enable_ws,
                c_tls_cert_path.as_ref().map(|x| x.as_ref()), // TLS cert path
                c_tls_key_path.as_ref().map(|x| x.as_ref()),  // TLS key path
            )
        };
        if status != 0 {
            Err(DittoError::from_ffi(ErrorKind::InvalidInput))
        } else {
            Ok(())
        }
    }

    fn stop_http_listen(&mut self) {
        ffi_sdk::ditto_stop_http_server(&self.ditto);
    }

    fn start_tcp_connect(&mut self, address: String) {
        let addr = char_p::new(address.clone());
        // this handle stores a tx entangled with an rx across the FFI boundary which
        // will drop if all tx elements drop
        let tcp_client_handle = ffi_sdk::ditto_add_static_tcp_client(&self.ditto, addr.as_ref());
        info!(?address, "static TCP client transport started");
        self.tcp_clients
            .insert(address, Box::new(tcp_client_handle));
    }

    fn stop_tcp_connect(&mut self, address: &str) {
        let _to_drop = self.tcp_clients.remove(address);
    }

    fn start_ws_connect(&mut self, url: String, routing_hint: u32) {
        let c_url = char_p::new(url.clone());
        let ws_client_handle =
            ffi_sdk::ditto_add_websocket_client(&self.ditto, c_url.as_ref(), routing_hint);
        info!(?url, "websocket client transport started");
        self.ws_clients.insert(url, Box::new(ws_client_handle));
    }

    fn stop_ws_connect(&mut self, url: &str) {
        let _to_drop = self.ws_clients.remove(url);
    }

    fn start_bluetooth(&mut self) {
        let mut error = DittoSdkTransportsError::None;
        let handle =
            unsafe { ffi_sdk::ditto_sdk_transports_ble_create(&self.ditto, (&mut error).into()) };

        if error == DittoSdkTransportsError::None {
            self.ble_transport_handle = Some(BleTransportHandleHolder(handle));
            tracing::info!("BLE transport started");
        } else {
            tracing::error!(?error, "failed to start BLE transport");
        }
    }

    fn stop_bluetooth(&mut self) {
        let Some(handle) = self.ble_transport_handle.take() else {
            return;
        };
        let mut error = DittoSdkTransportsError::None;
        unsafe { ffi_sdk::ditto_sdk_transports_ble_destroy(handle.0, (&mut error).into()) };
        if error != DittoSdkTransportsError::None {
            tracing::error!(?error, "failed to stop LAN transport");
        }
    }

    fn start_lan(&mut self, config: &LanConfig) {
        if config.mdns_enabled {
            tracing::info!("starting LAN transport");

            let mut error = DittoSdkTransportsError::None;

            let handle = unsafe {
                ffi_sdk::ditto_sdk_transports_lan_create(&self.ditto, (&mut error).into())
            };
            if error == DittoSdkTransportsError::None {
                self.mdns_transport_handle = Some(MdnsTransportHandleHolder(handle));
            } else {
                tracing::error!(?error, "failed to start LAN transport");
            }
        }

        if config.multicast_enabled {
            ffi_sdk::ditto_add_multicast_transport(&self.ditto);
        }
    }

    fn stop_lan(&mut self) {
        ffi_sdk::ditto_remove_multicast_transport(&self.ditto);

        let Some(handle) = self.mdns_transport_handle.take() else {
            return;
        };
        let mut error = DittoSdkTransportsError::None;
        unsafe { ffi_sdk::ditto_sdk_transports_lan_destroy(handle.0, (&mut error).into()) };
        if error != DittoSdkTransportsError::None {
            tracing::error!(?error, "failed to stop LAN transport");
        }
    }
}

/// This impl contains update for different transport protocols
impl TransportSync {
    fn update_peer_to_peer_lan(&mut self, state: &SyncState, old_state: &SyncState) {
        if state.config().listen.tcp != old_state.config().listen.tcp
            || state.config().peer_to_peer.lan != old_state.config().peer_to_peer.lan
        {
            self.stop_lan();
            if state.config().peer_to_peer.lan.enabled {
                self.start_lan(&state.config().peer_to_peer.lan);
            }
        }
    }

    fn update_listen_tcp(&mut self, state: &SyncState, old_state: &SyncState) {
        if state.config().listen.tcp != old_state.config().listen.tcp {
            self.stop_tcp_listen();
            if state.config().listen.tcp.enabled {
                self.start_tcp_listen(&state.config().listen.tcp);
            }
        }
    }

    fn update_listen_http(&mut self, state: &SyncState, old_state: &SyncState) {
        if state.config().listen.http != old_state.config().listen.http {
            self.stop_http_listen();
            if state.config().listen.http.enabled {
                let _ = self.start_http_listen(&state.config().listen.http);
            }
        }
    }

    fn update_connect_tcp_servers(&mut self, state: &SyncState, old_state: &SyncState) {
        let tcp_connects_to_stop = old_state
            .config()
            .connect
            .tcp_servers
            .difference(&state.config().connect.tcp_servers);
        for addr in tcp_connects_to_stop {
            self.stop_tcp_connect(addr);
        }

        let tcp_connects_to_start = state
            .config()
            .connect
            .tcp_servers
            .difference(&old_state.config().connect.tcp_servers);
        for addr in tcp_connects_to_start {
            self.start_tcp_connect(addr.clone());
        }
    }

    fn update_peer_to_peer_bluetooth_le(&mut self, state: &SyncState, old_state: &SyncState) {
        let new_ble_enabled = state.config().peer_to_peer.bluetooth_le.enabled;
        let old_ble_enabled = old_state.config().peer_to_peer.bluetooth_le.enabled;
        if old_ble_enabled && !new_ble_enabled {
            self.stop_bluetooth();
        }
        if new_ble_enabled && !old_ble_enabled {
            self.start_bluetooth();
        }
    }

    fn update_connect_websocket_url(&mut self, state: &SyncState, old_state: &SyncState) {
        let ws_connects_to_stop = old_state
            .config()
            .connect
            .websocket_urls
            .difference(&state.config().connect.websocket_urls);
        for url in ws_connects_to_stop {
            self.stop_ws_connect(url);
        }

        let ws_connects_to_start = state
            .config()
            .connect
            .websocket_urls
            .difference(&old_state.config().connect.websocket_urls);

        let routing_hint = state.config().global.routing_hint;
        for url in ws_connects_to_start {
            self.start_ws_connect(url.clone(), routing_hint);
        }
    }

    fn update_connect_retry_interval(&mut self, state: &SyncState, _old_state: &SyncState) {
        let retry_interval =
            u32::try_from(state.config().connect.retry_interval.as_millis()).unwrap_or(u32::MAX);

        ffi_sdk::ditto_set_connect_retry_interval(&self.ditto, retry_interval);
    }
}