dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
#![warn(missing_docs, clippy::missing_safety_doc, clippy::missing_errors_doc)]

use std::{
    future::IntoFuture,
    num::NonZeroUsize,
    ops::{Deref, DerefMut},
    sync::Arc,
};

pub use bus::{ConnectionError, Payload, Reliability, SendStatus};
use dittolive_ditto_base::{
    bus::{self, Callback, IAcceptor, IBus, ISendHandle, IStream, IStreamCandidate},
    peer_pubkey::PeerPubkey,
};
use safer_ffi::{bytes::Bytes, option::TaggedOption};

/// # Ditto Data Streams
/// Ditto Data Streams allow you to send datagrams accross a Ditto mesh.
///
/// This structure is the core accessor to the Ditto Data Streams, which may remind you of some TCP
/// APIs you've seen before (except this one can work without an IP connection too!).
///
/// While exploring the API from [`Bus::bind_topic`], [`Bus::connect`] and the examples their docs
/// contain is probably the easiest way to approach it, there are a few subjects that are global to
/// the API: giving them a look could be helpful.
///
/// The entire API is built around the builder pattern: a rather convenient way to explore it is to
/// simply let your code completion guide you around it.
///
/// ## Topics
/// Much like TCP and UDP have ports, Ditto Data Streams have `topic`s: instead of integers, you can
/// name your ports after anything you like!
///
/// The equivalent of binding a socket is to use [`Bus::bind_topic`], to which you'll provide a
/// topic (or name).
///
/// Any attempt to [`Bus::connect`] to your peer will have to specify on which topic the connection
/// should be made, the connection appearing as a new [`Stream`] for your [`Acceptor`].
///
/// ## [`Payload`] and [`IntoPayload`]
/// To carry data around, the Ditto Data Streams API uses [`Payload`]s, also known as
/// [`safer_ffi::bytes::Bytes`]: much like the wider known [`bytes::Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html), it's a cheaply cloneable and sliceable chunk of contiguous memory... But it does have a few low-level tricks up its sleeve to be even more efficient.
///
/// And while it's easy enough to construct it from common owned slice types, [`IntoPayload`] is a
/// very friendly option to make serialization less verbose, yet explicit and flexible.
///
/// ## [`IntoChannel`]: We promise, it's not as scary as it looks
/// In this API, anytime we think you may want the choice between closures and channels to handle
/// certain events you could subscribe to, we've used the following signature pattern:
/// ```
/// # use dittolive_ditto::experimental::bus::{Channel, IntoChannel};
/// # struct Event; struct Subscriber<T>(T);
/// fn subscribe<OverloadId, Handler>(handler: Handler) -> Subscriber<Handler::Receiver>
/// where
///     Handler: IntoChannel<OverloadId, Event>,
///     Handler::Sender: Channel<OverloadId, Event>,
/// # {unimplemented!()}
/// ```
///
/// While this looks imposing, it's actually a very magic formula: it allows you to write little to
/// no boiler plate in 99% of cases, while giving you the flexibility to use any handler you please
/// and allowing us to find every trick possible to minimize the overhead of calling your handler.
///
/// In most cases, all you need to do to use such an API is to call it with
/// - a closure: `let handle = subscribe(|e: Event| println!("{e}"));` (you may want to keep
///   `handle` around, as dropping it will cancel that subscription).
/// - any channel implementation you like: `let handle = subscribe(std::sync::mpsc::channel())`
///   (here, `handle` will deref to the receiver half of the channel, letting you call
///   `handle.recv()` whenever you like).
///
/// `OverloadId` is actually a charm that tricks the compiler into letting you bypass the orphan
/// rule<sup>[(1)](https://doc.rust-lang.org/book/ch10-02-traits.html#:~:text=orphan%20rule)[(2)](https://github.com/Ixrec/rust-orphan-rules)</sub>, and is described in more detail in [`IntoChannel`]'s documentation, which also contains an
/// example of how to use that trick to add support for [`flume`](https://crates.io/crates/flume)'s excellent channel implementation
/// to Ditto's Data Streams API.
#[derive(Clone)]
pub struct Bus {
    bus: bus::Bus,
}

impl Bus {
    pub(crate) fn new(inner: bus::Bus) -> Self {
        Self { bus: inner }
    }

    /// Returns a builder for creating a new [`Acceptor`] for a given `topic` on this peer.
    ///
    /// An [`Acceptor`] is responsible for handling remotely initiated stream connections.
    ///
    /// You can only have 1 [`Acceptor`] per topic at any given time. The builder returns a
    /// handle to the [`Acceptor`], dropping the handle will remove the [`Acceptor`] from the bus.
    ///
    /// ```
    /// # use dittolive_ditto::prelude::Ditto;
    /// # use dittolive_ditto::experimental::{bus::{Acceptor, Bus, ReliabilityMode}, peer_pubkey::PeerPubkey};
    /// # use anyhow::Result;
    /// # use safer_ffi::bytes::Bytes;
    /// # async fn usage(ditto: &Ditto, target: PeerPubkey) -> Result<()> {
    /// use tokio::sync::mpsc::unbounded_channel as channel; // (`std::sync::mpsc::channel` would also work)
    /// let bus = ditto.bus().expect("The bus must have been enabled using `DittoBuilder::with_experimental_bus` to use this feature");
    /// let mut acceptor: Acceptor<_> = bus.bind_topic("ping")
    ///                                    .reliability(ReliabilityMode::Reliable)
    ///                                    .on_receive_factory(channel)
    ///                                    .finish(channel())?;
    /// while let Some(mut stream) = acceptor.recv().await {
    ///     tokio::task::spawn(async move {
    ///         while let Some(packet) = stream.recv().await {
    ///             stream.message(packet).send();
    ///         }
    ///     });
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn bind_topic(&self, topic: impl TryInto<bus::Topic>) -> AcceptorBuilder<'_, ExactMatch> {
        AcceptorBuilder {
            bus: self,
            inner: ExactMatch(topic.try_into().ok()),
            reliability: bus::Reliability::Reliable,
            opener: (),
            marker: core::marker::PhantomData,
        }
    }

    /// Returns a builder for creating a new [`Stream`] to a remote peer.
    ///
    /// A `topic` is a unique identifier for the stream that must be agreed upon by both peers. It
    /// is possible to create multiple [`Stream`]s to the same peer with different topics.
    ///
    /// ```
    /// # use dittolive_ditto::prelude::Ditto;
    /// # use dittolive_ditto::experimental::{bus::{Bus, ReliabilityMode}, peer_pubkey::PeerPubkey};
    /// # use anyhow::Result;
    /// # use safer_ffi::bytes::Bytes;
    /// # async fn usage(ditto: &Ditto, target: PeerPubkey) -> Result<()> {
    /// use tokio::sync::mpsc::unbounded_channel as channel; // (`std::sync::mpsc::channel` would also work)
    /// let bus = ditto.bus().expect("The bus must have been enabled using `DittoBuilder::with_experimental_bus` to use this feature");
    /// let mut stream = bus.connect(target, "ping")
    ///                     .reliability(ReliabilityMode::Reliable)
    ///                     .on_receive_factory(channel)
    ///                     .finish_async().await.unwrap();
    /// for i in 0..10 {
    ///     let start = std::time::Instant::now();
    ///     stream.message("ping").send();
    ///     stream.recv().await;
    ///     println!("Ping {i} = {t:?}", t=start.elapsed());
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn connect(
        &self,
        peer: impl Into<PeerPubkey>,
        topic: impl TryInto<bus::Topic>,
    ) -> ConnectionBuilder<'_> {
        ConnectionBuilder {
            bus: self,
            peer: peer.into(),
            topic: topic.try_into().ok(),
            reliability: bus::Reliability::Reliable,
            on_receive: (),
            marker: core::marker::PhantomData,
        }
    }
}

pub use acceptor::*;
mod acceptor;

pub use connection_builder::*;
mod connection_builder;

pub use inbound::*;
mod inbound;

pub use stream_candidate::*;
mod stream_candidate;

pub use stream::*;
mod stream;

pub use message_builder::*;
mod message_builder;

pub use send_handle::*;
mod send_handle;

pub use helper_traits::*;
mod helper_traits;