tokio_ipc 0.1.0

Multi-protocol RPC framework built on top of tokio
Documentation
// Basic RPC example
//
// Usage:
//   Terminal 1 (Server): cargo run --example basic -- --server
//   Terminal 2 (Client): cargo run --example basic -- --client
//   Demo mode:           cargo run --example basic
//
// This example shows:
// - Server calling methods on the client
// - Client calling methods on the server
// - Full duplex communication
// - Push notifications from server to client

use anyhow::Result;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_ipc::{RpcClient, RpcServer, RpcServerHandler};
use tokio_socket::SocketAddr;

// Server protocol - what clients call on the server
tokio_ipc::protocol! {
    pub mod server_protocol {
        register {
            client_name: String,
        } -> {
            client_id: u64,
        };

        send_message {
            message: String,
        } -> {
            success: bool,
        };
    }
}

// Client protocol - what server calls on clients
tokio_ipc::protocol! {
    pub mod client_protocol {
        notify {
            message: String,
        };

        ping -> {
            pong: String,
        };
    }
}

mod server {
    use super::*;
    use client_protocol::Sender as _;

    // Generate sender/receiver types
    tokio_ipc::protocol_handler!(ServerReceiver impl [server_protocol] with ServerHandler);
    tokio_ipc::protocol_sender!(ClientSender impl [client_protocol]);

    // Server state
    #[derive(Clone)]
    struct ServerHandler {
        next_id: Arc<RwLock<u64>>,
        client_sender: ClientSender,
    }
    impl server_protocol::Receive for ServerHandler {
        async fn register(
            &self,
            client_name: String,
        ) -> Result<server_protocol::register::Response> {
            let mut id = self.next_id.write().await;
            *id += 1;
            let client_id = *id;

            println!(
                "[SERVER] Client registered: {} (ID: {})",
                client_name, client_id
            );

            // Send welcome notification to client
            self.client_sender
                .notify(format!(
                    "Welcome {}! You are client #{}",
                    client_name, client_id
                ))
                .await?;

            Ok(server_protocol::register::Response { client_id })
        }

        async fn send_message(
            &self,
            message: String,
        ) -> Result<server_protocol::send_message::Response> {
            println!("[SERVER] Received message: {}", message);

            // Echo back via notification
            self.client_sender
                .notify(format!("Server received: {}", message))
                .await?;

            Ok(server_protocol::send_message::Response { success: true })
        }
    }

    // Server
    struct Server {
        next_id: Arc<RwLock<u64>>,
    }

    impl RpcServerHandler for Server {
        type ReceiveRpc = ServerReceiver;
        type SendRpc = ClientSender;

        async fn on_rpc_connect(&self, client_sender: &Self::SendRpc) -> Self::ReceiveRpc {
            println!("[SERVER] Client connected");

            // Server can now call methods on the client!
            // Let's ping the client
            tokio::spawn({
                let sender = client_sender.clone();
                async move {
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    if let Ok(response) = sender.ping().await {
                        println!("[SERVER] Got pong: {}", response.pong);
                    }
                }
            });

            ServerReceiver::new(ServerHandler {
                next_id: self.next_id.clone(),
                client_sender: client_sender.clone(),
            })
        }

        async fn on_rpc_disconnect(&self, _peer_id: &str) {
            println!("[SERVER] Client disconnected");
        }
    }

    pub async fn run_server() -> Result<()> {
        println!("\n=== Basic RPC Server ===\n");

        let addr = SocketAddr::abstract_uds("rpc-basic");
        println!("[SERVER] Starting on abstract socket: rpc-basic");

        let _server = RpcServer::bind_unix(
            &addr,
            Server {
                next_id: Arc::new(RwLock::new(0)),
            },
        )?;

        println!("[SERVER] Ready. Press Ctrl+C to stop.\n");

        tokio::signal::ctrl_c().await?;
        println!("\n[SERVER] Shutting down...");

        Ok(())
    }
}

mod client {
    use super::*;
    use server_protocol::Sender as _;

    // Generate sender/receiver types
    tokio_ipc::protocol_sender!(ServerSender impl [server_protocol]);
    tokio_ipc::protocol_handler!(ClientReceiver impl [client_protocol] with ClientHandler);

    // Client handler
    #[derive(Clone)]
    struct ClientHandler;

    impl client_protocol::Receive for ClientHandler {
        async fn notify(&self, message: String) -> Result<()> {
            println!("[CLIENT] Notification: {}", message);
            Ok(())
        }

        async fn ping(&self) -> Result<client_protocol::ping::Response> {
            println!("[CLIENT] Received ping from server");
            Ok(client_protocol::ping::Response {
                pong: "Pong from client!".to_string(),
            })
        }
    }

    pub async fn run_client() -> Result<()> {
        println!("\n=== Basic RPC Client ===\n");

        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

        let addr = SocketAddr::abstract_uds("rpc-basic");
        println!("[CLIENT] Connecting to server...");

        // Client has a receiver for server to call
        let client_receiver = ClientReceiver::new(ClientHandler);
        let client: RpcClient<ServerSender> = RpcClient::connect(addr, client_receiver).await?;

        println!("[CLIENT] Connected! Registering with server...");

        let reg = client.sender.register("Alice".to_string()).await?;

        println!("[CLIENT] Registered with ID: {}\n", reg.client_id);

        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

        println!("[CLIENT] Sending message to server...");
        let msg = client
            .sender
            .send_message("Hello server!".to_string())
            .await?;
        println!("[CLIENT] Message sent: {}\n", msg.success);

        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

        println!("[CLIENT] All operations completed!");

        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let args: Vec<String> = std::env::args().collect();

    if args.len() > 1 {
        let arg = args[1].as_str();
        if arg == "--server" {
            return server::run_server().await;
        }

        if arg == "--client" {
            return client::run_client().await;
        }
        eprintln!("Unknown argument: {arg}");
    }
    println!("Usage:");
    println!("  {} --server    Run as server", args[0]);
    println!("  {} --client    Run as client", args[0]);
    std::process::exit(1);
}