dittolive-ditto 3.0.0-alpha2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use super::presence_observer::{
    PeersObserver, PeersObserverCtx, PresenceObserver, PresenceObserverCtx, WeakPresenceObserver,
};
use super::v2::V2Presence;
use super::v3::PresenceGraph;
use std::sync::{Mutex, Weak};

pub struct Presence {
    ditto: Arc<ffi_sdk::BoxedDitto>,
    observers_v2: Mutex<Vec<Weak<PeersObserverCtx>>>,
    observers_v3: Mutex<Vec<WeakPresenceObserver>>,
    registered: Mutex<bool>,
}

impl Presence {
    pub(crate) fn new(ditto: Arc<ffi_sdk::BoxedDitto>) -> Self {
        Self {
            ditto,
            observers_v2: Mutex::new(Vec::new()),
            observers_v3: Mutex::new(Vec::new()),
            registered: Mutex::new(false),
        }
    }

    /// C wrapper for calling the real callback on Presence
    pub(crate) unsafe extern "C" fn on_event_v2(ctx: *mut c_void, json: char_p::Ref<'_>) {
        let weak_ctx: &Weak<_> =
            &*::core::mem::ManuallyDrop::new(Weak::from_raw(ctx.cast::<Presence>()));
        if let Some(strong_ctx) = weak_ctx.upgrade() {
            let presence_json_str = json.to_str();
            strong_ctx.on_presence_v2(presence_json_str);
        }
    }
    /// C wrapper for calling the real callback on Presence
    pub(crate) unsafe extern "C" fn on_event_v3(ctx: *mut c_void, json: char_p::Ref<'_>) {
        let weak_ctx: &Weak<_> =
            &*::core::mem::ManuallyDrop::new(Weak::from_raw(ctx.cast::<Presence>()));
        if let Some(strong_ctx) = weak_ctx.upgrade() {
            let presence_json_str = json.to_str();
            strong_ctx.on_presence_v3(presence_json_str);
        }
    }

    fn on_presence_v2(&self, json_str: &str) {
        // v2 presence
        let mut observers_v2 = self.observers_v2.lock().unwrap();
        if observers_v2.is_empty() {
            return;
        }
        if let Ok(presence_v2) = serde_json::from_str::<V2Presence>(json_str) {
            observers_v2.retain(|weak_observer| {
                if let Some(observer) = weak_observer.upgrade() {
                    (observer.on_presence)(presence_v2.clone());
                    true
                } else {
                    false
                }
            })
        }
    }

    fn on_presence_v3(&self, json_str: &str) {
        // v3 presence
        let mut observers_v3 = self.observers_v3.lock().unwrap();
        if observers_v3.is_empty() {
            return;
        }
        if let Ok(presence_graph) = serde_json::from_str(json_str) {
            observers_v3.retain(|weak_observer| {
                if let Some(observer) = weak_observer.upgrade() {
                    (observer.on_presence)(&presence_graph);
                    true
                } else {
                    false
                }
            })
        }
    }

    fn subscribe(self: &Arc<Self>) {
        unsafe {
            ffi_sdk::ditto_register_presence_v2_callback(
                &self.ditto,
                Arc::downgrade(self).into_raw() as *mut _,
                None,
                None,
                Some(Presence::on_event_v2),
            );
        }

        unsafe {
            ffi_sdk::ditto_register_presence_callback_v3(
                &self.ditto,
                Arc::downgrade(self).into_raw() as *mut _,
                None,
                None,
                Some(Presence::on_event_v3),
            );
        }

        *self.registered.lock().unwrap() = true;
    }

    /// Add a peer observer and return a PresenceObserver to be able to drop it when desired.
    pub(crate) fn add_observer(
        self: &Arc<Self>,
        handler: impl Fn(V2Presence) + Send + Sync + 'static,
    ) -> PeersObserver {
        let context = PeersObserverCtx::new(Box::new(handler));
        let arc_context = Arc::new(context);

        let weak_context = Arc::downgrade(&arc_context);

        if !*self.registered.lock().unwrap() {
            self.subscribe();
        }

        {
            // New scope to minimize the time we hold the lock.
            let mut observers = self.observers_v2.lock().unwrap();
            observers.push(weak_context);
        }

        let self_1 = self.retain();
        std::thread::spawn(move || {
            unsafe {
                // Initial event.
                let str_box = ffi_sdk::ditto_presence_v2(&self_1.ditto);
                self_1.on_presence_v2(str_box.to_str())
            };
        });
        PeersObserver::new(arc_context)
    }

    /// Add a peer observer and return a PresenceObsever to be able to drop it when desired.
    pub fn observe(
        self: &Arc<Self>,
        callback: impl Fn(&PresenceGraph) + Send + Sync + 'static,
    ) -> PresenceObserver {
        if !*self.registered.lock().unwrap() {
            self.subscribe();
        }

        let context = PresenceObserverCtx::new(Box::new(callback));
        let arc_context = Arc::new(context);
        let weak_context = Arc::downgrade(&arc_context);

        {
            // New scope to minimize the time we hold the lock.
            let mut observers = self.observers_v3.lock().unwrap();
            observers.push(weak_context);
        }

        let self_1 = self.retain();
        std::thread::spawn(move || {
            unsafe {
                // Initial event.
                let str_box = ffi_sdk::ditto_presence_v3(&self_1.ditto);
                self_1.on_presence_v3(str_box.to_str())
            };
        });

        PresenceObserver::new(arc_context)
    }

    /// Return an immediate representation of known peers
    pub fn exec(&self) -> PresenceGraph {
        let raw_string = unsafe { ffi_sdk::ditto_presence_v3(&self.ditto) };
        let json_str = raw_string.to_str();
        serde_json::from_str(json_str).unwrap()
    }
}