dittolive-ditto-core 4.8.0-experimental-rust.2

This is a support crate for Ditto: dittolive-ditto is the crate we intend for you to interact with.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
//! This file MUST be kept in sync with dittolive-ditto-sys/src/bus.rs under penalty of
//! ABI-incompatibility!
//!
//! This is the FFI for the Bus Streams API.

use safer_ffi::{
    derive_ReprC,
    dyn_traits::VirtualPtr,
    layout::{CLayoutOf, ReprC, __HasNiche__},
    option::TaggedOption,
};

use crate::peer_pubkey::PeerPubkey;

pub type Callback<T> = safer_ffi::closure::BoxDynFnMut1<(), T>;
pub type Continuation<'a, T> = safer_ffi::closure::BoxDynFnMut1<(), T>;

/// The Ditto Bus API uses [`Bytes`](safer_ffi::bytes::Bytes) to transfer data between
pub type Payload = safer_ffi::bytes::Bytes<'static>;

/// The Bus itself. It allows listening for and opening connections (and might setup a default
/// one for single messages too).
#[derive_ReprC]
#[repr(C)]
pub struct Bus {
    pub inner: VirtualPtr<dyn IBus + Send + Sync>,
}
/// The Bus itself. It allows listening for and opening connections (and might setup a default
/// one for single messages too).
#[derive_ReprC(dyn, Clone)]
pub trait IBus {
    /// Adds an acceptor to the acceptor queue.
    ///
    /// If the `topic` was already in use, `None` is returned.
    extern "C" fn add_acceptor(
        &self,
        topic: safer_ffi::bytes::Bytes<'static>,
        on_accept: Callback<Stream>,
    ) -> Option<AcceptorId>;

    /// Removes an acceptor from the acceptor queue, based on the ID returned when it was added.
    extern "C" fn delete_acceptor(&self, id: Option<AcceptorId>);

    /// Attempts to establish a connection, yielding a connection result once done.
    extern "C" fn connect(
        &self,
        with: StreamInfo<'_>,
        then: Continuation<'static, ConnectionResult>,
    );

    /// DEPRECATED
    /// Sends a message on the single_message channel.
    extern "C" fn send_single_message(
        &self,
        to: PeerPubkey,
        data: Payload,
        reliability: ReliabilityMode,
    ) -> SendHandle;

    /// DEPRECATED
    /// Replaces the current single message callback by the one provided.
    ///
    /// Replacing the callback by `None` is valid.
    ///
    /// The previous callback will be returned by this method.
    extern "C" fn set_recv_single_message_callback(
        &self,
        callback: TaggedOption<Callback<SingleMessage>>,
    ) -> TaggedOption<Callback<SingleMessage>>;
}

/// The identifier of an acceptor in the [`Bus`].
#[repr(transparent)]
pub struct AcceptorId(pub core::num::NonZeroU32);
unsafe impl ReprC for AcceptorId {
    type CLayout = u32;
    fn is_valid(it: &'_ Self::CLayout) -> bool {
        *it != 0
    }
}
unsafe impl __HasNiche__ for AcceptorId {
    fn is_niche(it: &'_ <Self as ReprC>::CLayout) -> bool {
        *it == 0
    }
}
#[test]
fn acceptor_id_layout() {
    assert!(AcceptorId::is_niche(&0));
    assert!(!AcceptorId::is_valid(&0));
    assert!(!AcceptorId::is_niche(&1));
    assert!(AcceptorId::is_valid(&1));
    assert!(!AcceptorId::is_niche(&u32::MAX));
    assert!(AcceptorId::is_valid(&u32::MAX));
}

/// DEPRECATED
/// Requested reliability level for a message to be transmitted to another peer.
#[derive_ReprC]
#[repr(u8)]
pub enum ReliabilityMode {
    /// No guarantees of successful delivery, ordering, or once-only delivery
    Unreliable,
    /// Messages will be delivered at most once, in the same order that they are sent,
    /// but there may be gaps.
    UnreliableSequenced,
    /// Every message will be delivered in order or else the connection fails
    Reliable,
}

/// DEPRECATED
/// A message received from the single message API.
#[derive_ReprC]
#[repr(C)]
pub struct SingleMessage {
    /// The source of the message.
    pub source: PeerPubkey,
    /// The payload of the message.
    pub payload: Payload,
}

/// An open, bidirectional stream.
#[derive_ReprC]
#[repr(C)]
#[must_use = "Dropping a stream closes the connection."]
pub struct Stream {
    pub inner: VirtualPtr<dyn IStream + Send + Sync>,
}
#[derive_ReprC(dyn)]
/// An open, bidirectional stream.
pub trait IStream {
    /// The stream's information.
    extern "C" fn info(&self) -> StreamInfo<'_>;

    /// Send a payload.
    ///
    /// The returned SendHandle can be used to cancel the send (provided it hasn't already been
    /// delivered). `finished` is called once the send is considered complete
    /// (immediately for unreliable streams, upon ACK otherwise). If the send failed
    /// (only allowed for cancelations), the payload
    extern "C" fn send(&self, payload: Payload) -> SendHandle;

    /// Replaces the current recv callback.
    ///
    /// This can also be used to remove the existing one or extract it to concatenate callbacks.
    extern "C" fn set_recv_callback(
        &self,
        on_recv: TaggedOption<Callback<Payload>>,
    ) -> TaggedOption<Callback<Payload>>;

    /// Returns the stream's closer if the stream was indeed closed.
    extern "C" fn is_closed(&self) -> TaggedOption<StreamClosedBy>;

    /// Adds a `continuation` on the stream's closure.
    ///
    /// All continuations added to the stream's closure will be called once it is closed.
    ///
    /// The `continuation` is guaranteed to be called if the stream was already closed.
    extern "C" fn add_on_close(&self, continuation: Continuation<'static, StreamClosedBy>);
}

/// A handle on an send operation, allowing to await its completion or attempt to cancel it.
///
/// If dropped, the handle MUST neither `cancel` nor await the operation, but simply detach.
#[derive_ReprC]
#[repr(transparent)]
pub struct SendHandle(pub VirtualPtr<dyn ISendHandle + Send + Sync>);
/// A handle on an send operation, allowing to await its completion or attempt to cancel it.
///
/// If dropped, the handle MUST neither `cancel` nor await the operation, but simply detach.
#[derive_ReprC(dyn, Clone)]
pub trait ISendHandle {
    /// Attempts to cancel the send operation, returning the payload of the message once the
    /// cancellation succeeds.
    ///
    /// This cancellation may fail to cancel the operation, even if the message is still in a
    /// send queue.
    ///
    /// If a continuation has been passed to `Self::then`, and hasn't been called yet, it _must_ be
    /// called with the current result of `Self::poll`.
    extern "C" fn cancel(&self) -> CancellationResult;

    /// Returns the current status of the operation.
    ///
    /// Note that consistency isn't guaranteed: calling `poll` after `then` may yield a
    /// different result than that yielded to the continuation. In such a case, the
    /// continuation's argument is considered "canon".
    extern "C" fn poll(&self) -> SendStatus;

    /// Calls the `continuation` when the send status changes.
    ///
    /// Previously set callbacks MUST:
    /// - Never be called upon a change in the [`SendStatus`] if `keep_previous == false` once this
    ///   method returns.
    /// - Also be called upon a change in the [`SendStatus`] if `keep_previous == true`, without any
    ///   call order being specified.
    extern "C" fn set_on_change(&self, callback: Callback<SendStatus>, keep_previous: bool);
}

/// The informations about a stream
#[derive_ReprC]
#[repr(transparent)]
pub struct StreamInfo<'a>(pub VirtualPtr<dyn IStreamInfo + 'a>);
/// The informations about a stream (or stream candidate)
#[derive_ReprC(dyn)]
pub trait IStreamInfo {
    /// The peer with which the stream is/will be connected.
    extern "C" fn peer_pubkey(&self) -> PeerPubkey;

    /// The topic of the stream.
    ///
    /// The return value _may_ alias `self`. You can ensure it's fully owned by calling
    /// [`Bytes::upgrade`](safer_ffi::bytes::Bytes::upgrade) which will copy only if necessary.
    extern "C" fn topic(&self) -> safer_ffi::bytes::Bytes<'_>;
}

/// Identifies whether the stream was closed locally or remotely.
#[derive_ReprC]
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum StreamClosedBy {
    /// The stream was closed by the remote peer.
    Remote,
    /// The stream was closed by the locally.
    Local,
}

/// The result of attempting to connect.
#[repr(C, u8)]
pub enum ConnectionResult {
    /// Upon success, the stream is yielded.
    Accepted(Stream) = 1,
    /// Upon failure, the reason for that failureis yielded.
    Rejected(ConnectionError) = 0,
}

mod seal {
    use super::*;
    /// A safer-ffi friendly equivalent layout to [`ConnectionResult`].
    #[derive_ReprC]
    #[repr(C)]
    pub struct ConnectionResult_Layout {
        /// `true` if accepted, `false` if rejected.
        pub accepted: bool,
        /// Initialized iff `accepted` is `true`
        /// if not, must be a valid [`ConnectionError`] instead
        pub stream: Stream,
    }
    unsafe impl ReprC for ConnectionResult {
        type CLayout = CLayoutOf<ConnectionResult_Layout>;
        fn is_valid(it: &'_ Self::CLayout) -> bool {
            ConnectionResult_Layout::is_valid(it)
                || unsafe {
                    bool::is_valid(&it.accepted)
                        && !core::mem::transmute::<_, bool>(it.accepted)
                        && ConnectionError::is_valid(core::mem::transmute::<
                            &Stream_Layout,
                            &ConnectionError_Layout,
                        >(&it.stream))
                }
        }
    }
    #[test]
    fn connection_result_layout() {
        for expected in [
            ConnectionResult::Rejected(ConnectionError::PeerNotFound),
            ConnectionResult::Rejected(ConnectionError::TopicRejected),
        ] {
            assert!(ConnectionResult::is_valid(&unsafe {
                core::mem::transmute(expected)
            }))
        }
    }
    /// A safer-ffi friendly equivalent layout to [`CancellationResult`].
    #[derive_ReprC]
    #[repr(C)]
    pub struct CancellationResult_Layout {
        /// `true` if cancelling succeeded, `false` otherwise.
        pub success: bool,
        /// Initialized iff `accepted` is `true`,
        /// if not, must be a valid [`CancellationError`] instead
        pub error_reason: CancellationError,
    }
    unsafe impl ReprC for CancellationResult {
        type CLayout = CLayoutOf<CancellationResult_Layout>;
        fn is_valid(it: &'_ Self::CLayout) -> bool {
            CancellationResult_Layout::is_valid(it)
                || unsafe {
                    bool::is_valid(&it.success) && core::mem::transmute::<_, bool>(it.success)
                }
        }
    }
    #[test]
    fn cancellation_result_layout() {
        for expected in [
            CancellationResult::Ok,
            CancellationResult::Err(CancellationError::CancellationFailed),
        ] {
            assert!(CancellationResult::is_valid(&unsafe {
                core::mem::transmute(expected)
            }))
        }
        assert!(CancellationResult::is_valid(&unsafe {
            core::mem::transmute::<[bool; 2], CancellationResult_Layout_Layout>([false, false])
        }));
        assert!(CancellationResult::is_valid(&unsafe {
            core::mem::transmute::<[bool; 2], CancellationResult_Layout_Layout>([true, false])
        }));
        assert!(!CancellationResult::is_valid(&unsafe {
            core::mem::transmute::<[u8; 2], CancellationResult_Layout_Layout>([2, 0])
        }));
        assert!(!CancellationResult::is_valid(&unsafe {
            core::mem::transmute::<[u8; 2], CancellationResult_Layout_Layout>([2, 1])
        }));
        assert!(!CancellationResult::is_valid(&unsafe {
            core::mem::transmute::<[u8; 2], CancellationResult_Layout_Layout>([0, 1])
        }));
    }
}

/// The reason for a connection attempt's failure.
#[derive_ReprC]
#[repr(u8)]
#[derive(thiserror::Error, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Copy)]
pub enum ConnectionError {
    /// The peer you attempted to connect to was not found or failed to respond.
    #[error("The peer you attempted to connect to was not found or failed to respond in time.")]
    PeerNotFound,
    /// The peer you attempted to connect to was found, but refused your topic.
    #[error("The peer you attempted to connect to was found, but refused your topic.")]
    TopicRejected,
}

/// The reason for a connection attempt's failure.
#[derive_ReprC]
#[repr(u8)]
#[derive(thiserror::Error, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Copy)]
pub enum CancellationError {
    /// Cancelling the send operation failed.
    #[error("Cancelling the send operation failed.")]
    CancellationFailed,
}
/// An FFI-safe `Result<Payload, CancellationError>`
#[repr(C, u8)]
pub enum CancellationResult {
    /// Cancelling the send operation was successful.
    ///
    /// The send operation's payload is returned if possible.
    Ok = 1,
    /// Cancelling the send operation was unsuccessful.
    Err(CancellationError) = 0,
}
impl From<Result<(), CancellationError>> for CancellationResult {
    fn from(value: Result<(), CancellationError>) -> Self {
        match value {
            Ok(()) => Self::Ok,
            Err(error) => Self::Err(error),
        }
    }
}
impl From<CancellationResult> for Result<(), CancellationError> {
    fn from(value: CancellationResult) -> Self {
        match value {
            CancellationResult::Ok => Ok(()),
            CancellationResult::Err(error) => Err(error),
        }
    }
}

/// Indicates the status of a given send operation.
///
/// A [`SendHandle`]'s state may become [`Unknown`](SendStatus::Unknown) at any time,
/// but may never return to another state after that.
///
/// Transition details:
/// - `Pending -> Sent`: Trivial, each state is technically allowed to represent the other.
/// - `Pending -> Failed`: This likely indicates that the stream has been closed before the message
///   could be sent.
#[derive_ReprC]
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[repr(u8)]
pub enum SendStatus {
    /// The send status couldn't be recovered.
    ///
    /// Transition to this state may happen from any state (including "final" states),
    /// as the status may stop being tracked.
    Unknown = 0,
    /// The send operation is still pending.
    ///
    /// Note this doesn't guarantee that the message _hasn't_ been sent or acknowledged yet,
    /// just that the handle wasn't able to confirm that either of these states has been
    /// reached.
    Pending = 1,
    /// The send operation has been executed, but no acknowledgement has been received by the
    /// handle.
    ///
    /// Note that an acknowledgement might still be on the way, or that the handle is still
    /// expected to switch to [`SentNeverAck`](SendStatus::SentNeverAck) if no acknowledgement
    /// is expected.
    Sent = 2,
    /// The send was identified as having failed.
    ///
    /// This typically would happen if the target disconnected before reaching one of the other end
    /// states.
    Failed = 3,
    /// The send was cancelled.
    ///
    /// Note that this does not guarantee that the message won't be delivered, even if it hadn't
    /// been sent yet.
    Cancelled = 4,
}

impl ISendHandle for SendStatus {
    extern "C" fn cancel(&self) -> CancellationResult {
        CancellationResult::Err(CancellationError::CancellationFailed)
    }
    extern "C" fn poll(&self) -> SendStatus {
        *self
    }
    extern "C" fn set_on_change(
        &self,
        mut continuation: Callback<SendStatus>,
        _keep_previous: bool,
    ) {
        continuation.call(*self)
    }
}

impl core::fmt::Debug for SendStatus {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s = match self {
            SendStatus::Unknown => "SendStatus::Unknown",
            SendStatus::Pending => "SendStatus::Pending",
            SendStatus::Sent => "SendStatus::Sent",
            SendStatus::Failed => "SendStatus::Failed",
            SendStatus::Cancelled => "SendStatus::Cancelled",
        };
        f.write_str(s)
    }
}
impl core::fmt::Display for SendStatus {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s = match self {
            SendStatus::Unknown => "Unknown",
            SendStatus::Pending => "Pending",
            SendStatus::Sent => "Sent",
            SendStatus::Failed => "Failed",
            SendStatus::Cancelled => "Cancelled",
        };
        f.write_str(s)
    }
}