taps 0.2.2

taps (Tokio Asynchronous Pub/Sub) is an in-process async message broker that can be used for messaging between spawned tokio tasks.
Documentation
//! This module demonstrates a simple RPC (Remote Procedure Call) setup using the `taps` library.
//!
//! The `taps` library provides a basic pub-sub (publish-subscribe) mechanism, which is used here to facilitate RPC communication.
//!
//! # Architecture
//!
//! - **RPC Client**: Sends a request and waits for a response.
//! - **RPC Server**: Listens for requests, processes them, and sends a response back.
//!
//! Both the client and server use the `taps` library's `Client` type to communicate.
//!
//! # Data Structures
//!
//! - `RpcRequest`: Enum representing the different types of requests the server can handle.
//! - `RpcResponse`: Enum representing the potential responses from the server.
//! - `RpcMessage`: Struct that encapsulates an `RpcRequest` or `RpcResponse` with a unique ID. This ID is used to match responses to their corresponding requests.
//! - `MessageContent`: Enum that can hold either a request or a response.
//!
//! # Flow
//!
//! 1. The main function initializes the broker from the `taps` library.
//! 2. Two tasks are spawned representing the RPC client and the RPC server.
//! 3. The client subscribes to a topic to receive its response. The topic is unique for each client request, derived from a UUID.
//! 4. The client sends an `RpcRequest` wrapped in an `RpcMessage` to the `rpc_requests` topic.
//! 5. The server is subscribed to the `rpc_requests` topic and picks up the client's request.
//! 6. After processing the request, the server sends an `RpcResponse` wrapped in an `RpcMessage` to the response topic specific to the client's request UUID.
//! 7. The client receives the response on its unique topic and processes it.
//!
//! # Usage
//!
//! Running this module's `main` function will initiate the RPC flow. The client sends a request to compute the square of a number, and the server responds with the result.
use log::{error, info};
use taps::{Broker, Client};
use tokio::sync::mpsc;
use uuid::Uuid;

// Define the RPC message structures
#[derive(Debug, Clone)]
enum RpcRequest {
    ComputeSquare(i32),
}

#[derive(Debug, Clone)]
enum RpcResponse {
    SquareResult(i32),
}

#[derive(Clone)]
struct RpcMessage {
    id: Uuid,
    content: MessageContent,
}

#[derive(Clone)]
enum MessageContent {
    Request(RpcRequest),
    Response(RpcResponse),
}

#[tokio::main]
async fn main() {
    env_logger::init();

    info!("Starting the broker...");

    let mut broker = Broker::new();
    let (broker_tx, broker_rx) = mpsc::channel(32);
    let broker_tx = broker_tx.clone();

    tokio::spawn(async move {
        broker.run(broker_rx).await;
    });
    info!("Broker started!");

    let mut worker_client = Client::new(broker_tx.clone());
    let worker_task = tokio::spawn(async move {
        let client_uuid = Uuid::new_v4();
        let response_topic = format!("rpc_responses_{}", client_uuid);

        worker_client.subscribe(response_topic.clone()).await;

        let request = RpcMessage {
            id: client_uuid,
            content: MessageContent::Request(RpcRequest::ComputeSquare(5)),
        };

        worker_client
            .publish("rpc_requests".to_string(), request)
            .await;
        info!("RPC Client sent a request.");

        if let Some(response) = worker_client.receive(&response_topic).await {
            match response.content {
                MessageContent::Response(RpcResponse::SquareResult(value)) => {
                    info!("RPC Client received a response: {}", value);
                }
                _ => error!("Unexpected message content."),
            }
        } else {
            error!("RPC Client failed to receive a response.");
        }
    });

    let mut rpc_client = Client::new(broker_tx.clone());
    let rpc_task = tokio::spawn(async move {
        rpc_client.subscribe("rpc_requests".to_string()).await;
        info!("RPC Server started and subscribed to 'rpc_requests' topic.");

        if let Some(request) = rpc_client.receive("rpc_requests").await {
            match request.content {
                MessageContent::Request(RpcRequest::ComputeSquare(value)) => {
                    // Simulate some delay or processing time in the server
                    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

                    let result = value * value;
                    let response = RpcMessage {
                        id: request.id,
                        content: MessageContent::Response(RpcResponse::SquareResult(result)),
                    };
                    let response_topic = format!("rpc_responses_{}", request.id);
                    rpc_client.publish(response_topic, response).await;
                    info!("RPC Server processed a request and sent a response.");
                }
                _ => error!("Unexpected message content."),
            }
        } else {
            error!("RPC Server failed to receive a request.");
        }
    });

    let _ = tokio::join!(worker_task, rpc_task);
}