Crate notifier_hub

Crate notifier_hub 

Source
Expand description

§NotifierHub Library

NotifierHub is a library designed to facilitate the creation, subscription, and broadcasting of asynchronous messages through smart channels. It provides utilities for error handling, custom broadcasting strategies, and notifications upon channel creation.

§Examples without thread

The example below demonstrates how to broadcast a message to subscribers of a channel in a single thread and wait for the message to be sent using a timeout:

use notifier_hub::{notifier::NotifierHub, writing_handler::Duration};

#[tokio::main]
async fn main() {
    // Create a new NotifierHub
    let mut hub = NotifierHub::new();

    // Subscribe to a channel and get a receiver
    let mut receiver1 = hub.subscribe(&"channel1", 100);
    // Subscribe to the same channel and get a receiver
    let mut receiver2 = hub.subscribe(&"channel1", 100);

    // Message to broadcast
    let msg = "Message !";

    // Send the message to all subscribers and get a WritingHandler to track the results
    let handler = hub.clone_send(msg.clone(), &"channel1").unwrap();

    // Wait for up to 100 milliseconds for senders to put the message in the channel buffer
    // This ensures the message is sent successfully or times out.
    handler.wait(Some(Duration::from_millis(100))).await.unwrap();

    assert_eq!(&receiver1.recv().await.unwrap(), &msg);
    assert_eq!(&receiver2.recv().await.unwrap(), &msg);
}

§Examples without thread

The example below demonstrates how to use notifier_hub to create a multithreaded notification system, where multiple subscribers listen for messages on different channels.

use notifier_hub::{closable_trait::ClosableMessage, notifier::NotifierHub};
use std::sync::Arc;
use tokio::sync::Mutex;

// This type is going to be sent among the subscribers
#[derive(Clone, Debug)]
enum Message {
   StringMessage(String),
   Number(u32),
   Close,
}

impl ClosableMessage for Message {
   fn get_close_message() -> Self {
       Self::Close
   }
}

// First subscriber
fn subscriber_1(hub: Arc<Mutex<NotifierHub<Message, &'static str>>>) {
   tokio::spawn(async move {
       // Subscribing to "channel1" and "channel2"
       let mut receiver = hub.lock().await
           .subscribe_multiple(&["channel1", "channel2"], 100);
       loop {
           let msg = receiver.recv().await.unwrap();
           match msg {
               Message::StringMessage(s_msg) => {
                   println!("Just received a new message as subscriber_1: {s_msg}")
               }
               Message::Number(n) => println!("Just received a number as subscriber_1: {n}"),
               Message::Close => break,
           }
       }
       // As we are breaking with close, we don't need to unsubscribe
   });
}

// Second subscriber
fn subscriber_2(hub: Arc<Mutex<NotifierHub<Message, &'static str>>>) {
   tokio::spawn(async move {
       // Subscribing only to "channel1"
       let mut receiver = hub.lock().await.subscribe(&"channel1", 100);
       loop {
           let msg = receiver.recv().await.unwrap();
           match msg {
               Message::StringMessage(s_msg) => {
                   println!("Just received a new message as subscriber_2: {s_msg}")
               }
               Message::Number(n) => println!("Just received a number as subscriber_2: {n}"),
               Message::Close => break,
           }
       }
       // As we are breaking with close, we don't need to unsubscribe
   });
}

#[tokio::main]
async fn main() {
   // Create a new NotifierHub wrapped in a mutex
   let hub = Arc::new(Mutex::new(NotifierHub::new()));

   let mut creation_waiter = hub.lock().await.get_creation_waiter(&"channel1");
   let mut destruction_waiter = hub.lock().await.get_destruction_waiter(&"channel1");

   // Start subscriber threads
   subscriber_1(hub.clone());
   subscriber_2(hub.clone());

   // Wait for subscriber_1 and subscriber_2 to be ready to receive messages on "channel1"
   creation_waiter.recv().await.unwrap();
   creation_waiter.recv().await.unwrap();

   {
       let mut hub = hub.lock().await;

       let msg1 = Message::StringMessage("Hello!".to_string());
       // Send the message to subscriber_1 and subscriber_2 as they are both subscribed to "channel1"
       hub.clone_send(msg1, &"channel1").unwrap();

       let msg2 = Message::Number(18);
       // Only subscriber_1 will receive this message as it is subscribed to "channel2"
       hub.clone_send(msg2, &"channel2").unwrap();

       let msg3 = Message::StringMessage("Broadcast message!".to_string());
       // Sends msg3 on all channels, so subscriber_1 will receive it twice
       hub.broadcast_clone(msg3);

       // Send a close message to subscriber_1 and subscriber_2
       hub.shutdown_clone(&"channel1").unwrap();
   }

   // Wait for both subscriber_1 and subscriber_2 to unsubscribe
   destruction_waiter.recv().await;
   destruction_waiter.recv().await;
}

§Modules

This crate is organized into the following modules:

Modules§

closable_trait
error
Contains definitions related to error types and handling.
notifier
Contains the main NotifierHub structure and its associated methods.
writing_handler
Provides the WritingHandler for handling broadcasts in an asynchronous context.

Macros§

unexpected