#![doc(alias = "mesh")]
#![warn(missing_docs)]
use_prelude!();
use std::{
future::Future,
sync::{Mutex, OnceLock, Weak},
};
pub use observer::PresenceObserver;
use crate::{debug, ditto::DittoFields};
mod connection_request_handler;
pub(crate) mod observer;
pub use self::connection_request_handler::{ConnectionRequest, ConnectionRequestAuthorization};
pub use crate::transport::v3::{Connection, Peer, PresenceGraph, PresenceOs};
pub type JsonObject = ::serde_json::Map<String, ::serde_json::Value>;
#[derive(Debug, Default)]
pub(crate) struct PeerMetadataCache {
json_str: String,
value: Arc<JsonObject>,
}
pub struct Presence {
ditto: Weak<DittoFields>,
peer_metadata_cache: Mutex<PeerMetadataCache>,
}
impl Presence {
pub(crate) fn new(ditto: Weak<DittoFields>) -> Self {
Self {
ditto,
peer_metadata_cache: <_>::default(),
}
}
pub fn graph(&self) -> PresenceGraph {
let ditto = self.ditto.upgrade().unwrap();
let buffer = ffi_sdk::dittoffi_presence_graph(&ditto.ditto);
serde_json::from_slice::<PresenceGraph>(&buffer)
.expect("should receive UTF-8 encoded JSON PresenceGraph")
}
pub fn register_observer(
self: &Arc<Self>,
callback: impl Fn(&PresenceGraph) + Send + Sync + 'static,
) -> Result<PresenceObserver> {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)?;
let callback = Arc::new(Mutex::new(callback));
let callback = move |graph: &PresenceGraph| {
let callback = callback.lock().unwrap();
callback(graph);
};
PresenceObserver::new(&*ditto.ditto, callback)
}
}
impl Presence {
pub fn set_peer_metadata(&self, peer_metadata: &impl Serialize) -> Result<(), DittoError> {
let payload = ::serde_json::to_string(peer_metadata)?;
self.set_peer_metadata_json_str(&payload)
}
pub fn set_peer_metadata_json_str(&self, json: &str) -> Result<(), DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)?;
ffi_sdk::dittoffi_presence_set_peer_metadata_json_throws(
&ditto.ditto,
json.as_bytes().into(),
)
.into_rust_result()?;
Ok(())
}
pub fn peer_metadata_json_str(&self) -> String {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
String::from_utf8(From::<Box<[u8]>>::from(
ffi_sdk::dittoffi_presence_peer_metadata_json(&ditto.ditto).into(),
))
.expect("UTF-8")
}
pub fn peer_metadata_serde<T: DeserializeOwned>(&self) -> Result<T> {
let value = ::serde_json::from_str(&self.peer_metadata_json_str())?;
Ok(value)
}
pub fn peer_metadata(&self) -> Arc<JsonObject> {
let json_str = self.peer_metadata_json_str();
let mut cache = self
.peer_metadata_cache
.lock()
.unwrap_or_else(|it| it.into_inner());
if json_str != cache.json_str {
*cache = PeerMetadataCache {
value: Arc::new(
::serde_json::from_str(&json_str).expect("incorrect json from `dittoffi`"),
),
json_str,
};
}
cache.value.retain()
}
pub fn set_connection_request_handler<
F: sealed::IntoOption<
impl 'static + Send + Sync + Fn(ConnectionRequest) -> ConnectionRequestAuthorization,
>,
>(
&self,
handler_or_none: F,
) {
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
let ffi_callback = F::into_option(handler_or_none).map(|callback| {
Arc::new(move |raw: repr_c::Box<ffi_sdk::FfiConnectionRequest>| {
let connection_request = ConnectionRequest::new(raw);
let raw = connection_request.raw();
callback(connection_request).into_ffi(&raw);
})
.into()
});
ffi_sdk::dittoffi_presence_set_connection_request_handler(&ditto.ditto, ffi_callback)
}
pub fn set_connection_request_handler_async<ConnectionRequestAuthorizationFut>(
&self,
async_callback: impl 'static
+ Send
+ Sync
+ Fn(ConnectionRequest) -> ConnectionRequestAuthorizationFut,
) where
ConnectionRequestAuthorizationFut:
'static + Send + Future<Output = ConnectionRequestAuthorization>,
{
use tokio::{runtime, sync::mpsc};
let ditto = self
.ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)
.unwrap();
let new_mini_runtime = || {
let (tx, mut rx) = mpsc::unbounded_channel();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build `async` runtime");
::std::thread::spawn(move || {
runtime.block_on(async move {
while let Some(task) = rx.recv().await {
() = task.await;
}
})
});
tx
};
ffi_sdk::dittoffi_presence_set_connection_request_handler(
&ditto.ditto,
Some(
Arc::new(move |raw| {
static MINI_RUNTIME: OnceLock<
mpsc::UnboundedSender<Pin<Box<dyn Send + Future<Output = ()>>>>,
> = OnceLock::new();
let connection_request = ConnectionRequest::new(raw);
let raw = connection_request.raw();
let task_to_spawn_detached = {
let async_callback_connection_request = async_callback(connection_request);
async move {
async_callback_connection_request.await.into_ffi(&raw);
}
};
MINI_RUNTIME
.get_or_init(new_mini_runtime)
.send(Box::pin(task_to_spawn_detached))
.expect("dedicated async runtime to be alive");
})
.into(),
),
)
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConnectionType {
Bluetooth,
AccessPoint,
P2PWiFi,
WebSocket,
#[doc(hidden)]
Unknown,
}
impl ConnectionType {
pub(crate) fn from_ffi(ffi: ::ffi_sdk::ConnectionType) -> Self {
match ffi {
::ffi_sdk::ConnectionType::Bluetooth => Self::Bluetooth,
::ffi_sdk::ConnectionType::AccessPoint => Self::AccessPoint,
::ffi_sdk::ConnectionType::P2PWiFi => Self::P2PWiFi,
::ffi_sdk::ConnectionType::WebSocket => Self::WebSocket,
#[allow(unreachable_patterns)]
_ => {
#[allow(deprecated)]
{
debug!(connection_type = ?ffi, "got unknown `ConnectionType`");
}
Self::Unknown
}
}
}
}
mod sealed {
use super::*;
pub trait IntoOption<
F: 'static + Send + Sync + Fn(ConnectionRequest) -> ConnectionRequestAuthorization,
>
{
fn into_option(_: Self) -> Option<F>;
}
impl IntoOption<fn(ConnectionRequest) -> ConnectionRequestAuthorization>
for Option<::never_say_never::Never>
{
fn into_option(_: Self) -> Option<fn(ConnectionRequest) -> ConnectionRequestAuthorization> {
None
}
}
impl<F> IntoOption<F> for F
where
F: 'static + Send + Sync + Fn(ConnectionRequest) -> ConnectionRequestAuthorization,
{
fn into_option(f: Self) -> Option<Self> {
Some(f)
}
}
}