use anyhow::Result;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_ipc::{RpcClient, RpcServer, RpcServerHandler};
use tokio_socket::SocketAddr;
tokio_ipc::protocol! {
pub mod server_protocol {
register {
client_name: String,
} -> {
client_id: u64,
};
send_message {
message: String,
} -> {
success: bool,
};
}
}
tokio_ipc::protocol! {
pub mod client_protocol {
notify {
message: String,
};
ping -> {
pong: String,
};
}
}
mod server {
use super::*;
use client_protocol::Sender as _;
tokio_ipc::protocol_handler!(ServerReceiver impl [server_protocol] with ServerHandler);
tokio_ipc::protocol_sender!(ClientSender impl [client_protocol]);
#[derive(Clone)]
struct ServerHandler {
next_id: Arc<RwLock<u64>>,
client_sender: ClientSender,
}
impl server_protocol::Receive for ServerHandler {
async fn register(
&self,
client_name: String,
) -> Result<server_protocol::register::Response> {
let mut id = self.next_id.write().await;
*id += 1;
let client_id = *id;
println!(
"[SERVER] Client registered: {} (ID: {})",
client_name, client_id
);
self.client_sender
.notify(format!(
"Welcome {}! You are client #{}",
client_name, client_id
))
.await?;
Ok(server_protocol::register::Response { client_id })
}
async fn send_message(
&self,
message: String,
) -> Result<server_protocol::send_message::Response> {
println!("[SERVER] Received message: {}", message);
self.client_sender
.notify(format!("Server received: {}", message))
.await?;
Ok(server_protocol::send_message::Response { success: true })
}
}
struct Server {
next_id: Arc<RwLock<u64>>,
}
impl RpcServerHandler for Server {
type ReceiveRpc = ServerReceiver;
type SendRpc = ClientSender;
async fn on_rpc_connect(&self, client_sender: &Self::SendRpc) -> Self::ReceiveRpc {
println!("[SERVER] Client connected");
tokio::spawn({
let sender = client_sender.clone();
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if let Ok(response) = sender.ping().await {
println!("[SERVER] Got pong: {}", response.pong);
}
}
});
ServerReceiver::new(ServerHandler {
next_id: self.next_id.clone(),
client_sender: client_sender.clone(),
})
}
async fn on_rpc_disconnect(&self, _peer_id: &str) {
println!("[SERVER] Client disconnected");
}
}
pub async fn run_server() -> Result<()> {
println!("\n=== Basic RPC Server ===\n");
let addr = SocketAddr::abstract_uds("rpc-basic");
println!("[SERVER] Starting on abstract socket: rpc-basic");
let _server = RpcServer::bind_unix(
&addr,
Server {
next_id: Arc::new(RwLock::new(0)),
},
)?;
println!("[SERVER] Ready. Press Ctrl+C to stop.\n");
tokio::signal::ctrl_c().await?;
println!("\n[SERVER] Shutting down...");
Ok(())
}
}
mod client {
use super::*;
use server_protocol::Sender as _;
tokio_ipc::protocol_sender!(ServerSender impl [server_protocol]);
tokio_ipc::protocol_handler!(ClientReceiver impl [client_protocol] with ClientHandler);
#[derive(Clone)]
struct ClientHandler;
impl client_protocol::Receive for ClientHandler {
async fn notify(&self, message: String) -> Result<()> {
println!("[CLIENT] Notification: {}", message);
Ok(())
}
async fn ping(&self) -> Result<client_protocol::ping::Response> {
println!("[CLIENT] Received ping from server");
Ok(client_protocol::ping::Response {
pong: "Pong from client!".to_string(),
})
}
}
pub async fn run_client() -> Result<()> {
println!("\n=== Basic RPC Client ===\n");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let addr = SocketAddr::abstract_uds("rpc-basic");
println!("[CLIENT] Connecting to server...");
let client_receiver = ClientReceiver::new(ClientHandler);
let client: RpcClient<ServerSender> = RpcClient::connect(addr, client_receiver).await?;
println!("[CLIENT] Connected! Registering with server...");
let reg = client.sender.register("Alice".to_string()).await?;
println!("[CLIENT] Registered with ID: {}\n", reg.client_id);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
println!("[CLIENT] Sending message to server...");
let msg = client
.sender
.send_message("Hello server!".to_string())
.await?;
println!("[CLIENT] Message sent: {}\n", msg.success);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
println!("[CLIENT] All operations completed!");
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() > 1 {
let arg = args[1].as_str();
if arg == "--server" {
return server::run_server().await;
}
if arg == "--client" {
return client::run_client().await;
}
eprintln!("Unknown argument: {arg}");
}
println!("Usage:");
println!(" {} --server Run as server", args[0]);
println!(" {} --client Run as client", args[0]);
std::process::exit(1);
}