message_transport_shm/
message_transport_shm.rs1use serde::{Deserialize, Serialize};
4use std::env;
5use xrpc::{
6 Message, MessageChannel, MessageChannelAdapter, SharedMemoryConfig, SharedMemoryFrameTransport,
7 message::types::{CompressionType, MessageType},
8};
9
10#[derive(Debug, Serialize, Deserialize)]
11struct AddRequest {
12 a: i32,
13 b: i32,
14}
15
16#[derive(Debug, Serialize, Deserialize)]
17struct AddResponse {
18 result: i32,
19}
20
21#[derive(Debug, Serialize, Deserialize)]
22struct LogEvent {
23 level: String,
24 message: String,
25}
26
27const SERVICE_NAME: &str = "test_message_channel";
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn std::error::Error>> {
31 let args: Vec<String> = env::args().collect();
32 let mode = args.get(1).map(|s| s.as_str()).unwrap_or("server");
33
34 match mode {
35 "server" => run_server().await?,
36 "client" => run_client().await?,
37 _ => {
38 eprintln!("Usage: cargo run --example message_transport_shm -- [server|client]");
39 std::process::exit(1);
40 }
41 }
42
43 Ok(())
44}
45
46async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
47 println!("[Server] Creating shared memory transport");
48
49 let config = SharedMemoryConfig::default();
50 let transport = SharedMemoryFrameTransport::create_server(SERVICE_NAME, config)?;
51 let channel = MessageChannelAdapter::new(transport);
52
53 println!("[Server] Waiting for messages");
54
55 loop {
56 let message = channel.recv().await?;
57
58 match message.msg_type {
59 MessageType::Call => {
60 println!(
61 "[Server] Received Call: method={}, id={}",
62 message.method, message.id
63 );
64
65 match message.method.as_str() {
66 "add" => {
67 let req: AddRequest = message.deserialize_payload()?;
68 println!("[Server] AddRequest: {} + {}", req.a, req.b);
69
70 let resp = AddResponse {
71 result: req.a + req.b,
72 };
73 let reply = Message::reply(message.id, resp)?;
74 channel.send(&reply).await?;
75 println!("[Server] Sent reply");
76 }
77 "divide" => {
78 let req: AddRequest = message.deserialize_payload()?;
79 if req.b == 0 {
80 let error = Message::error(message.id, "Division by zero");
81 channel.send(&error).await?;
82 println!("[Server] Sent error: division by zero");
83 } else {
84 let resp = AddResponse {
85 result: req.a / req.b,
86 };
87 let reply = Message::reply(message.id, resp)?;
88 channel.send(&reply).await?;
89 }
90 }
91 "shutdown" => {
92 println!("[Server] Shutdown requested");
93 let reply = Message::reply(message.id, "ok")?;
94 channel.send(&reply).await?;
95 break;
96 }
97 _ => {
98 let error = Message::error(message.id, "Unknown method");
99 channel.send(&error).await?;
100 }
101 }
102 }
103 MessageType::Notification => {
104 println!("[Server] Received Notification: method={}", message.method);
105 if message.method == "log" {
106 let event: LogEvent = message.deserialize_payload()?;
107 println!("[Server] Log [{}]: {}", event.level, event.message);
108 }
109 }
110 _ => {
111 println!("[Server] Unknown message type: {:?}", message.msg_type);
112 }
113 }
114 }
115
116 println!("[Server] Shutting down");
117 Ok(())
118}
119
120async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
121 println!("[Client] Connecting to shared memory transport");
122
123 let transport = SharedMemoryFrameTransport::connect_client(SERVICE_NAME)?;
124 let channel = MessageChannelAdapter::new(transport);
125
126 println!("[Client] Connected!");
127
128 println!("\n[Client] Calling add (10 + 32)");
130 let call = Message::call("add", AddRequest { a: 10, b: 32 })?;
131 channel.send(&call).await?;
132
133 let reply = channel.recv().await?;
134 let resp: AddResponse = reply.deserialize_payload()?;
135 println!("[Client] Result: {}", resp.result);
136
137 println!("\n[Client] Calling add with LZ4 compression (100 + 200)");
139 let mut call = Message::call("add", AddRequest { a: 100, b: 200 })?;
140 call.metadata.compression = CompressionType::Lz4;
141 channel.send(&call).await?;
142
143 let reply = channel.recv().await?;
144 let resp: AddResponse = reply.deserialize_payload()?;
145 println!(
146 "[Client] Result: {} (compression: {:?})",
147 resp.result, reply.metadata.compression
148 );
149
150 println!("\n[Client] Calling divide (10 / 0)");
152 let call = Message::call("divide", AddRequest { a: 10, b: 0 })?;
153 channel.send(&call).await?;
154
155 let reply = channel.recv().await?;
156 if reply.msg_type == MessageType::Error {
157 let error_msg: String = reply.deserialize_payload()?;
158 println!("[Client] Got error: {}", error_msg);
159 }
160
161 println!("\n[Client] Sending notification log");
163 let notification = Message::notification(
164 "log",
165 LogEvent {
166 level: "INFO".to_string(),
167 message: "Client started successfully".to_string(),
168 },
169 )?;
170 channel.send(¬ification).await?;
171 println!("[Client] Notification sent (no response expected)");
172
173 println!("\n[Client] Sending shutdown...");
175 let call = Message::call("shutdown", ())?;
176 channel.send(&call).await?;
177 let _ = channel.recv().await?;
178 println!("[Client] Done!");
179
180 Ok(())
181}