#![doc(alias = "mesh")]
#![warn(missing_docs)]
use_prelude!();
use std::{
future::Future,
sync::{Mutex, OnceLock, Weak},
};
pub use observer::PresenceObserver;
use crate::{ditto::DittoFields, transport::v2::V2Presence};
mod connection_request_handler;
pub(crate) mod observer;
pub(crate) mod observer_legacy;
pub use self::connection_request_handler::{ConnectionRequest, ConnectionRequestAuthorization};
pub use crate::transport::v3::{Connection, Peer, PresenceGraph, PresenceOs};
pub type JsonObject = ::serde_json::Map<String, ::serde_json::Value>;
#[derive(Debug, Default)]
pub(crate) struct PeerMetadataCache {
json_str: String,
value: Arc<JsonObject>,
}
pub struct Presence {
ditto: Weak<DittoFields>,
peer_metadata_cache: Mutex<PeerMetadataCache>,
observers_v2: Mutex<Vec<Weak<observer_legacy::PeersObserverCtx>>>,
observers_v3: Mutex<Vec<observer_legacy::WeakPresenceObserver>>,
registered: Mutex<bool>,
}
impl Presence {
pub(crate) fn new(ditto: Weak<DittoFields>) -> Self {
Self {
ditto,
peer_metadata_cache: <_>::default(),
observers_v2: <_>::default(),
observers_v3: <_>::default(),
registered: false.into(),
}
}
pub fn graph(&self) -> PresenceGraph {
let ditto = self.ditto.upgrade().unwrap();
let buffer = ffi_sdk::dittoffi_presence_graph(&ditto.ditto);
serde_json::from_slice::<PresenceGraph>(&buffer)
.expect("should receive UTF-8 encoded JSON PresenceGraph")
}
pub fn register_observer(
self: &Arc<Self>,
callback: impl Fn(&PresenceGraph) + Send + Sync + 'static,
) -> Result<PresenceObserver> {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)?;
let callback = Arc::new(Mutex::new(callback));
let callback = move |graph: &PresenceGraph| {
let callback = callback.lock().unwrap();
callback(graph);
};
PresenceObserver::new(&*ditto.ditto, callback)
}
}
impl Presence {
pub fn set_peer_metadata(&self, peer_metadata: &impl Serialize) -> Result<(), DittoError> {
let payload = ::serde_json::to_string(peer_metadata)?;
self.set_peer_metadata_json_str(&payload)
}
pub fn set_peer_metadata_json_str(&self, json: &str) -> Result<(), DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)?;
ffi_sdk::dittoffi_presence_set_peer_metadata_json_throws(
&ditto.ditto,
json.as_bytes().into(),
)
.into_rust_result()?;
Ok(())
}
pub fn peer_metadata_json_str(&self) -> String {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
String::from_utf8(From::<Box<[u8]>>::from(
ffi_sdk::dittoffi_presence_peer_metadata_json(&ditto.ditto).into(),
))
.expect("UTF-8")
}
pub fn peer_metadata_serde<T: DeserializeOwned>(&self) -> Result<T> {
let value = ::serde_json::from_str(&self.peer_metadata_json_str())?;
Ok(value)
}
pub fn peer_metadata(&self) -> Arc<JsonObject> {
let json_str = self.peer_metadata_json_str();
let mut cache = self
.peer_metadata_cache
.lock()
.unwrap_or_else(|it| it.into_inner());
if json_str != cache.json_str {
*cache = PeerMetadataCache {
value: Arc::new(
::serde_json::from_str(&json_str).expect("incorrect json from `dittoffi`"),
),
json_str,
};
}
cache.value.retain()
}
pub fn set_connection_request_handler<
F: sealed::IntoOption<
impl 'static + Send + Sync + Fn(ConnectionRequest) -> ConnectionRequestAuthorization,
>,
>(
&self,
handler_or_none: F,
) {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
let ffi_callback = F::into_option(handler_or_none).map(|callback| {
Arc::new(move |raw: repr_c::Box<ffi_sdk::FfiConnectionRequest>| {
let connection_request = ConnectionRequest::new(raw);
let raw = connection_request.raw();
callback(connection_request).into_ffi(&raw);
})
.into()
});
ffi_sdk::dittoffi_presence_set_connection_request_handler(&ditto.ditto, ffi_callback)
}
pub fn set_connection_request_handler_async<ConnectionRequestAuthorizationFut>(
&self,
async_callback: impl 'static
+ Send
+ Sync
+ Fn(ConnectionRequest) -> ConnectionRequestAuthorizationFut,
) where
ConnectionRequestAuthorizationFut:
'static + Send + Future<Output = ConnectionRequestAuthorization>,
{
use tokio::{runtime, sync::mpsc};
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
let new_mini_runtime = || {
let (tx, mut rx) = mpsc::unbounded_channel();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build `async` runtime");
::std::thread::spawn(move || {
runtime.block_on(async move {
while let Some(task) = rx.recv().await {
() = task.await;
}
})
});
tx
};
ffi_sdk::dittoffi_presence_set_connection_request_handler(
&ditto.ditto,
Some(
Arc::new(move |raw| {
static MINI_RUNTIME: OnceLock<
mpsc::UnboundedSender<Pin<Box<dyn Send + Future<Output = ()>>>>,
> = OnceLock::new();
let connection_request = ConnectionRequest::new(raw);
let raw = connection_request.raw();
let task_to_spawn_detached = {
let async_callback_connection_request = async_callback(connection_request);
async move {
async_callback_connection_request.await.into_ffi(&raw);
}
};
MINI_RUNTIME
.get_or_init(new_mini_runtime)
.send(Box::pin(task_to_spawn_detached))
.expect("dedicated async runtime to be alive");
})
.into(),
),
)
}
}
#[allow(deprecated)]
impl Presence {
#[doc(hidden)]
#[deprecated(note = "Use `.graph()` instead")]
pub fn exec(&self) -> PresenceGraph {
self.graph()
}
#[doc(hidden)]
#[allow(deprecated)]
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
pub fn observe(
self: &Arc<Self>,
callback: impl Fn(&PresenceGraph) + Send + Sync + 'static,
) -> observer_legacy::PresenceObserver {
if !*self.registered.lock().unwrap() {
self.subscribe();
}
let context = observer_legacy::PresenceObserverCtx::new(Box::new(callback));
let arc_context = Arc::new(context);
let weak_context = Arc::downgrade(&arc_context);
{
let mut observers = self.observers_v3.lock().unwrap();
observers.push(weak_context);
}
::std::thread::spawn({
let this = self.retain();
move || {
let Some(ditto) = this.ditto.upgrade() else {
return;
};
let str_box = ffi_sdk::ditto_presence_v3(&ditto.ditto);
this.on_presence_v3(str_box.to_str())
}
});
observer_legacy::PresenceObserver::new(arc_context)
}
#[deprecated(note = "Use `presence().observe()` instead")]
pub(crate) fn add_observer(
self: &Arc<Self>,
handler: impl Fn(V2Presence) + Send + Sync + 'static,
) -> PeersObserver {
let context = observer_legacy::PeersObserverCtx::new(Box::new(handler));
let arc_context = Arc::new(context);
let weak_context = Arc::downgrade(&arc_context);
if !*self.registered.lock().unwrap() {
self.subscribe();
}
{
let mut observers = self.observers_v2.lock().unwrap();
observers.push(weak_context);
}
::std::thread::spawn({
let this = self.retain();
move || {
let Some(ditto) = this.ditto.upgrade() else {
return;
};
let str_box = ffi_sdk::ditto_presence_v2(&ditto.ditto);
this.on_presence_v2(str_box.to_str())
}
});
PeersObserver::new(arc_context)
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
unsafe extern "C" fn on_event_v2(ctx: *mut c_void, json: char_p::Ref<'_>, _: Blocking) {
Self::borrowing_from_ctx(ctx, |weak_ctx| {
if let Some(strong_ctx) = weak_ctx.upgrade() {
let presence_json_str = json.to_str();
strong_ctx.on_presence_v2(presence_json_str);
}
})
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
unsafe extern "C" fn on_event_v3(ctx: *mut c_void, json: char_p::Ref<'_>, _: Blocking) {
Self::borrowing_from_ctx(ctx, |weak_ctx| {
if let Some(strong_ctx) = weak_ctx.upgrade() {
let presence_json_str = json.to_str();
strong_ctx.on_presence_v3(presence_json_str);
}
})
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
fn on_presence_v2(&self, json_str: &str) {
let mut observers_v2 = self.observers_v2.lock().unwrap();
if observers_v2.is_empty() {
return;
}
if let Ok(presence_v2) = serde_json::from_str::<V2Presence>(json_str) {
observers_v2.retain(|weak_observer| {
if let Some(observer) = weak_observer.upgrade() {
(observer.on_presence)(presence_v2.clone());
true
} else {
false
}
})
}
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
fn on_presence_v3(&self, json_str: &str) {
let mut observers_v3 = self.observers_v3.lock().unwrap();
if observers_v3.is_empty() {
return;
}
if let Ok(presence_graph) = serde_json::from_str(json_str) {
observers_v3.retain(|weak_observer| {
if let Some(observer) = weak_observer.upgrade() {
(observer.on_presence)(&presence_graph);
true
} else {
false
}
})
}
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
fn subscribe(self: &Arc<Self>) {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
unsafe {
let weak_self = Arc::downgrade(self);
ffi_sdk::ditto_register_presence_v2_callback(
&ditto.ditto,
weak_self.as_ptr() as *mut _,
Some(Presence::retain),
Some(Presence::release),
Some(<unsafe extern "C" fn(_, char_p::Ref<'_>, _)>::into(
Presence::on_event_v2,
)),
);
ffi_sdk::ditto_register_presence_callback_v3(
&ditto.ditto,
weak_self.as_ptr() as *mut _,
Some(Presence::retain),
Some(Presence::release),
Some(<unsafe extern "C" fn(_, char_p::Ref<'_>, _)>::into(
Presence::on_event_v3,
)),
);
drop(weak_self);
}
*self.registered.lock().unwrap() = true;
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
#[track_caller]
unsafe fn borrowing_from_ctx(ctx: *const c_void, yielding: impl FnOnce(&Weak<Presence>)) {
let weak_ctx = ::core::mem::ManuallyDrop::new(Weak::from_raw(ctx.cast::<Presence>()));
yielding(&weak_ctx)
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
unsafe extern "C" fn retain(ctx: *mut c_void, _: Blocking) {
Self::borrowing_from_ctx(ctx, |weak_ctx| _ = Weak::into_raw(weak_ctx.clone()))
}
#[deprecated(note = "Use `ditto.presence().register_observer(...)` instead")]
unsafe extern "C" fn release(ctx: *mut c_void, _: Blocking) {
drop(Weak::<Presence>::from_raw(ctx.cast()))
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConnectionType {
Bluetooth,
AccessPoint,
P2PWiFi,
WebSocket,
#[doc(hidden)]
Unknown,
}
impl ConnectionType {
pub(crate) fn from_ffi(ffi: ::ffi_sdk::ConnectionType) -> Self {
match ffi {
::ffi_sdk::ConnectionType::Bluetooth => Self::Bluetooth,
::ffi_sdk::ConnectionType::AccessPoint => Self::AccessPoint,
::ffi_sdk::ConnectionType::P2PWiFi => Self::P2PWiFi,
::ffi_sdk::ConnectionType::WebSocket => Self::WebSocket,
#[allow(unreachable_patterns)]
_ => {
debug!(connection_type = ?ffi, "got unknown `ConnectionType`");
Self::Unknown
}
}
}
}
mod sealed {
use super::*;
pub trait IntoOption<
F: 'static + Send + Sync + Fn(ConnectionRequest) -> ConnectionRequestAuthorization,
>
{
fn into_option(_: Self) -> Option<F>;
}
impl IntoOption<fn(ConnectionRequest) -> ConnectionRequestAuthorization>
for Option<::never_say_never::Never>
{
fn into_option(_: Self) -> Option<fn(ConnectionRequest) -> ConnectionRequestAuthorization> {
None
}
}
impl<F> IntoOption<F> for F
where
F: 'static + Send + Sync + Fn(ConnectionRequest) -> ConnectionRequestAuthorization,
{
fn into_option(f: Self) -> Option<Self> {
Some(f)
}
}
}