piying 0.1.1

Fault-tolerant Async Actors Built on Tokio
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
//! # Remote Actors in Piying
//!
//! The `remote` module in Piying provides tools for managing distributed actors across nodes,
//! enabling actors to communicate seamlessly in a peer-to-peer (P2P) network. By leveraging
//! the [libp2p](https://libp2p.io) library, Piying allows you to register actors under unique
//! names and send messages between actors on different nodes as though they were local.
//!
//! ## Key Features
//!
//! - **Composable Architecture**: The [`Behaviour`] struct implements libp2p's `NetworkBehaviour`,
//!   allowing seamless integration with existing libp2p applications and other protocols.
//! - **Quick Bootstrap**: The `bootstrap()` and `bootstrap_on()` functions provide one-line
//!   setup for development and simple deployments.
//! - **Custom Transport**: The [`run_swarm()`] function accepts a pre-built swarm with any
//!   transport while handling the event loop for you.
//! - **Actor Registration & Discovery**: Actors can be registered under unique names and looked up
//!   across the network using [`RemoteActorRef`](crate::actor::RemoteActorRef).
//! - **Reliable Messaging**: Ensures reliable message delivery between nodes using a combination
//!   of Kademlia DHT for discovery and request-response protocols for communication.
//! - **Modular Design**: Separate [`messaging`] and [`registry`] modules handle different aspects
//!   of distributed actor communication.
//!
//! ## Getting Started
//!
//! For quick prototyping and development:
//!
//! ```ignore
//! use piying::remote;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!     // One line to bootstrap a distributed actor system
//!     let peer_id = remote::bootstrap()?;
//!
//!     // Now use actors normally
//!     // actor_ref.register("my_actor").await?;
//!
//!     Ok(())
//! }
//! ```
//!
//! For production deployments with custom configuration:
//!
//! ```ignore
//! use piying::remote;
//! use libp2p::swarm::NetworkBehaviour;
//!
//! #[derive(NetworkBehaviour)]
//! struct MyBehaviour {
//!     piying: remote::Behaviour,
//!     // Add other libp2p behaviors as needed
//! }
//!
//! // Create custom libp2p swarm with full control over
//! // transports, discovery, and protocol composition
//! ```

use std::{
    any,
    collections::HashMap,
    str,
    sync::{Arc, LazyLock},
};

#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
use std::error;

use futures::StreamExt;
use libp2p::{PeerId, Swarm, swarm::NetworkBehaviour};
use tokio::sync::Mutex;

use crate::{
    Actor,
    actor::{ActorId, ActorRef, Links, WeakActorRef},
    error::{RegistryError, RemoteSendError},
    mailbox::SignalMailbox,
};

#[cfg(not(any(feature = "serde-codec", feature = "rkyv-codec")))]
compile_error!("The `remote` feature requires either `serde-codec` or `rkyv-codec`");

#[doc(hidden)]
pub mod _internal;
mod behaviour;
pub mod codec;
#[allow(missing_docs)] // rkyv::Archive derive generates undocumented archived types
pub mod messaging;
pub mod registry;
pub mod session;
mod swarm;
pub mod wire;

pub use behaviour::*;
pub use session::applied_protocol;
pub use swarm::*;

pub(crate) static REMOTE_REGISTRY: LazyLock<Mutex<HashMap<ActorId, RemoteRegistryActorRef>>> =
    LazyLock::new(|| Mutex::new(HashMap::new()));

/// Register an actor in the local REMOTE_REGISTRY under a well-known ActorId.
/// This allows `RemoteActorRef::for_peer()` to find the actor without DHT lookup.
pub async fn register_actor_local<A: Actor>(actor_ref: &ActorRef<A>, id: ActorId) {
    let entry = RemoteRegistryActorRef::new(actor_ref.clone(), None);
    REMOTE_REGISTRY.lock().await.insert(id, entry);
}

/// Remove an actor from the local REMOTE_REGISTRY by its well-known ActorId.
/// Returns `true` if the entry was present and removed.
pub fn unregister_actor_local(id: &ActorId) -> bool {
    // Use try_lock to avoid blocking (this may be called from a Drop impl).
    match REMOTE_REGISTRY.try_lock() {
        Ok(mut registry) => registry.remove(id).is_some(),
        Err(_) => false,
    }
}

pub(crate) struct RemoteRegistryActorRef {
    actor_ref: BoxRegisteredActorRef,
    pub(crate) name: Option<Arc<str>>,
    pub(crate) signal_mailbox: Box<dyn SignalMailbox>,
    pub(crate) links: Links,
}

impl RemoteRegistryActorRef {
    pub(crate) fn new<A: Actor>(actor_ref: ActorRef<A>, name: Option<Arc<str>>) -> Self {
        let signal_mailbox = actor_ref.weak_signal_mailbox();
        let links = actor_ref.links.clone();
        Self {
            actor_ref: BoxRegisteredActorRef::Strong(Box::new(actor_ref)),
            name,
            signal_mailbox,
            links,
        }
    }

    pub(crate) fn new_weak<A: Actor>(actor_ref: WeakActorRef<A>, name: Option<Arc<str>>) -> Self {
        let signal_mailbox = actor_ref.weak_signal_mailbox();
        let links = actor_ref.links.clone();
        Self {
            actor_ref: BoxRegisteredActorRef::Weak(Box::new(actor_ref)),
            name,
            signal_mailbox,
            links,
        }
    }

    pub(crate) fn downcast<A: Actor>(
        &self,
    ) -> Result<ActorRef<A>, DowncastRegsiteredActorRefError> {
        match &self.actor_ref {
            BoxRegisteredActorRef::Strong(any) => any
                .downcast_ref::<ActorRef<A>>()
                .ok_or(DowncastRegsiteredActorRefError::BadActorType)
                .cloned(),
            BoxRegisteredActorRef::Weak(any) => any
                .downcast_ref::<WeakActorRef<A>>()
                .ok_or(DowncastRegsiteredActorRefError::BadActorType)?
                .upgrade()
                .ok_or(DowncastRegsiteredActorRefError::ActorNotRunning),
        }
    }
}

pub(crate) enum DowncastRegsiteredActorRefError {
    BadActorType,
    ActorNotRunning,
}

impl<E> From<DowncastRegsiteredActorRefError> for RemoteSendError<E> {
    fn from(err: DowncastRegsiteredActorRefError) -> Self {
        match err {
            DowncastRegsiteredActorRefError::BadActorType => RemoteSendError::BadActorType,
            DowncastRegsiteredActorRefError::ActorNotRunning => RemoteSendError::ActorNotRunning,
        }
    }
}

pub(crate) enum BoxRegisteredActorRef {
    Strong(Box<dyn any::Any + Send + Sync>),
    Weak(Box<dyn any::Any + Send + Sync>),
}

/// `RemoteActor` is a trait for identifying actors remotely.
///
/// Each remote actor must implement this trait and provide a unique identifier string (`REMOTE_ID`).
/// The identifier is essential to distinguish between different actor types during remote communication.
///
/// ## Example with Derive
///
/// ```
/// use piying::{Actor, RemoteActor};
///
/// #[derive(Actor, RemoteActor)]
/// pub struct MyActor;
/// ```
///
/// ## Example Manual Implementation
///
/// ```
/// use piying::remote::RemoteActor;
///
/// pub struct MyActor;
///
/// impl RemoteActor for MyActor {
///     const REMOTE_ID: &'static str = "my_actor_id";
/// }
/// ```
pub trait RemoteActor {
    /// The remote identifier string.
    const REMOTE_ID: &'static str;
}

/// `RemoteMessage` is a trait for identifying messages that are sent between remote actors.
///
/// Each remote message type must implement this trait and provide a unique identifier string (`REMOTE_ID`).
/// The unique ID ensures that each message type is recognized correctly during message passing between nodes.
///
/// This trait is typically implemented automatically with the [`#[remote_message]`](crate::remote_message) macro.
pub trait RemoteMessage<M> {
    /// The remote identifier string.
    const REMOTE_ID: &'static str;
}

/// Bootstrap a simple actor swarm with mDNS discovery for local development.
///
/// This convenience function creates and runs a libp2p swarm with:
/// - TCP and QUIC transports
/// - mDNS peer discovery (local network only)
/// - Automatic listening on an OS-assigned port
///
/// Requires the `serde-codec` feature (uses CBOR for transport encoding).
///
/// For production use or custom configuration, use `piying::remote::Behaviour`
/// with your own libp2p swarm setup.
///
/// # Example
/// ```ignore
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///     // One line to get started!
///     remote::bootstrap()?;
///
///     // Now use remote actors normally
///     let actor_ref = MyActor::spawn_default();
///     actor_ref.register("my_actor").await?;
///     Ok(())
/// }
/// ```
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
pub fn bootstrap() -> Result<PeerId, Box<dyn error::Error>> {
    bootstrap_on("/ip4/0.0.0.0/tcp/0")
}

/// Bootstrap with a specific listen address.
///
/// Requires the `serde-codec` feature.
#[cfg(all(feature = "serde-codec", not(feature = "rkyv-codec")))]
pub fn bootstrap_on(addr: &str) -> Result<PeerId, Box<dyn error::Error>> {
    use libp2p::{SwarmBuilder, mdns, noise, swarm::SwarmEvent, tcp, yamux};

    #[derive(NetworkBehaviour)]
    struct BootstrapBehaviour {
        piying: Behaviour,
        mdns: mdns::tokio::Behaviour,
    }

    let mut swarm = SwarmBuilder::with_new_identity()
        .with_tokio()
        .with_tcp(
            tcp::Config::default(),
            noise::Config::new,
            yamux::Config::default,
        )?
        .with_quic()
        .with_behaviour(|key| {
            let local_peer_id = key.public().to_peer_id();
            let piying = Behaviour::new(local_peer_id, messaging::Config::default());
            let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)?;

            Ok(BootstrapBehaviour { piying, mdns })
        })?
        .build();

    swarm.behaviour().piying.try_init_global()?;

    swarm.listen_on(addr.parse()?)?;

    let local_peer_id = *swarm.local_peer_id();

    tokio::spawn(async move {
        loop {
            match swarm.select_next_some().await {
                SwarmEvent::Behaviour(BootstrapBehaviourEvent::Mdns(mdns::Event::Discovered(
                    list,
                ))) => {
                    for (peer_id, multiaddr) in list {
                        #[cfg(feature = "tracing")]
                        tracing::info!("mDNS discovered a new peer: {peer_id}");
                        swarm.add_peer_address(peer_id, multiaddr);
                    }
                }
                SwarmEvent::Behaviour(BootstrapBehaviourEvent::Mdns(mdns::Event::Expired(
                    list,
                ))) => {
                    for (peer_id, _multiaddr) in list {
                        #[cfg(feature = "tracing")]
                        tracing::warn!("mDNS discover peer has expired: {peer_id}");
                        let _ = swarm.disconnect_peer_id(peer_id);
                    }
                }
                #[cfg(feature = "tracing")]
                SwarmEvent::NewListenAddr { address, .. } => {
                    tracing::info!("ActorSwarm listening on {address}");
                }
                _ => {}
            }
        }
    });

    Ok(local_peer_id)
}

/// Run a pre-built libp2p swarm as the actor swarm event loop.
///
/// This is the most flexible way to use piying's remote actors with custom
/// transports. You build the `Swarm` yourself (with any transport, encryption,
/// and multiplexing) and include [`Behaviour`] in your composed `NetworkBehaviour`.
///
/// # Prerequisites
///
/// Before calling this function, you must:
/// 1. Build a `Swarm` containing [`Behaviour`] in its `NetworkBehaviour`
/// 2. Call [`Behaviour::try_init_global()`] on the piying behaviour
/// 3. Call `swarm.listen_on(addr)` if you want the swarm to accept connections
///
/// # Example
///
/// ```ignore
/// use piying::remote::{self, codec::PiyingRkyvCodec};
/// use libp2p::{swarm::NetworkBehaviour, noise, tcp, yamux};
///
/// #[derive(NetworkBehaviour)]
/// struct MyBehaviour {
///     piying: remote::Behaviour<PiyingRkyvCodec>,
/// }
///
/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let mut swarm = libp2p::SwarmBuilder::with_new_identity()
///     .with_tokio()
///     .with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)?
///     .with_behaviour(|key| {
///         let peer_id = key.public().to_peer_id();
///         let config = remote::messaging::Config::default();
///         let codec = PiyingRkyvCodec::new(&config);
///         Ok(MyBehaviour {
///             piying: remote::Behaviour::with_codec(peer_id, config, codec),
///         })
///     })?
///     .build();
///
/// swarm.behaviour().piying.try_init_global()?;
/// swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
///
/// let peer_id = remote::run_swarm(swarm);
/// # Ok(())
/// # }
/// ```
pub fn run_swarm<B>(mut swarm: Swarm<B>) -> PeerId
where
    B: NetworkBehaviour + Send + 'static,
    <B as NetworkBehaviour>::ToSwarm: Send,
{
    let local_peer_id = *swarm.local_peer_id();

    tokio::spawn(async move {
        loop {
            let _event = swarm.select_next_some().await;
        }
    });

    local_peer_id
}

/// Clear all entries from the local actor registry.
///
/// Useful for test isolation when multiple tests run in the same process
/// and share the global registry. After clearing, any previously registered
/// actors will no longer be found by incoming remote messages or
/// [`RemoteActorRef::for_peer()`](crate::actor::RemoteActorRef::for_peer) lookups until they are re-registered.
pub async fn clear_registry() {
    REMOTE_REGISTRY.lock().await.clear();
}

/// Synchronous version of [`clear_registry`].
///
/// Spins on [`Mutex::try_lock`] with [`std::thread::yield_now`] until the lock
/// is acquired. Intended for test harness cleanup that runs outside an async
/// context.
pub fn clear_registry_sync() {
    loop {
        match REMOTE_REGISTRY.try_lock() {
            Ok(mut registry) => {
                registry.clear();
                return;
            }
            Err(_) => std::thread::yield_now(),
        }
    }
}

/// Synchronous version of [`register_actor_local`].
///
/// Spins on [`Mutex::try_lock`] with [`std::thread::yield_now`] until the lock
/// is acquired. Intended for pre-run hooks that execute outside an async context
/// (e.g., before the network actor starts blocking).
pub fn register_actor_local_sync<A: Actor>(actor_ref: &ActorRef<A>, well_known_id: ActorId) {
    loop {
        match REMOTE_REGISTRY.try_lock() {
            Ok(mut registry) => {
                registry.insert(
                    well_known_id,
                    RemoteRegistryActorRef::new(actor_ref.clone(), None),
                );
                return;
            }
            Err(_) => std::thread::yield_now(),
        }
    }
}

/// Check whether an actor with the given ID is registered in the local registry.
///
/// Uses [`Mutex::try_lock`] to avoid blocking. Returns `false` if the lock is
/// contended (conservative: caller should fall back to swarm routing).
pub fn is_registered_locally(actor_id: ActorId) -> bool {
    match REMOTE_REGISTRY.try_lock() {
        Ok(registry) => registry.contains_key(&actor_id),
        Err(_) => false,
    }
}

/// Unregisters an actor within the swarm.
///
/// This will only unregister an actor previously registered by the current node.
pub async fn unregister(name: impl Into<Arc<str>>) -> Result<(), RegistryError> {
    ActorSwarm::get()
        .ok_or(RegistryError::SwarmNotBootstrapped)?
        .unregister(name.into())
        .await;
    Ok(())
}