pub struct MessageChannelAdapter<F: FrameTransport, C: Codec = BincodeCodec> { /* private fields */ }Expand description
Adapter that wraps a FrameTransport to provide MessageChannel functionality.
Implementations§
Source§impl<F: FrameTransport> MessageChannelAdapter<F, BincodeCodec>
impl<F: FrameTransport> MessageChannelAdapter<F, BincodeCodec>
Sourcepub fn new(transport: F) -> Self
pub fn new(transport: F) -> Self
Create a new adapter with default BincodeCodec.
Examples found in repository?
examples/rpc_client_server.rs (line 56)
51async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
52 println!("[Server] Starting RPC server");
53
54 let config = SharedMemoryConfig::default();
55 let transport = SharedMemoryFrameTransport::create_server(SERVICE_NAME, config)?;
56 let channel = Arc::new(MessageChannelAdapter::new(transport));
57
58 let server = RpcServer::new();
59
60 server.register_typed("add", |req: AddRequest| async move {
61 println!("[Server] add({}, {})", req.a, req.b);
62 Ok(AddResponse {
63 result: req.a + req.b,
64 })
65 });
66
67 server.register_typed("echo", |req: EchoRequest| async move {
68 println!("[Server] echo(\"{}\")", req.message);
69 let len = req.message.len();
70 Ok(EchoResponse {
71 message: req.message,
72 length: len,
73 })
74 });
75
76 println!("[Server] Registered {} handlers", server.handler_count());
77 println!("[Server] Waiting for requests\n");
78
79 server.serve(channel).await?;
80
81 Ok(())
82}
83
84async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
85 println!("[Client] Connecting to RPC server");
86
87 let transport = SharedMemoryFrameTransport::connect_client(SERVICE_NAME)?;
88 let channel = MessageChannelAdapter::new(transport);
89
90 let client = RpcClient::new(channel);
91 let _handle = client.start();
92
93 println!("[Client] Connected!\n");
94
95 println!("[Client] Calling add(10, 32)");
96 let resp: AddResponse = client.call("add", &AddRequest { a: 10, b: 32 }).await?;
97 println!("[Client] Result: {}\n", resp.result);
98
99 println!("[Client] Calling add(100, 200)");
100 let resp: AddResponse = client.call("add", &AddRequest { a: 100, b: 200 }).await?;
101 println!("[Client] Result: {}\n", resp.result);
102
103 println!("[Client] Calling echo(\"Hello, RPC!\")");
104 let resp: EchoResponse = client
105 .call(
106 "echo",
107 &EchoRequest {
108 message: "Hello, RPC!".to_string(),
109 },
110 )
111 .await?;
112 println!(
113 "[Client] Result: message=\"{}\", length={}\n",
114 resp.message, resp.length
115 );
116
117 println!("[Client] Calling unknown method");
118 let result: Result<(), _> = client.call("unknown", &()).await;
119 match result {
120 Ok(_) => println!("[Client] Unexpected success"),
121 Err(e) => println!("[Client] Got expected error: {}\n", e),
122 }
123
124 client.close().await?;
125 println!("[Client] Done!");
126
127 Ok(())
128}More examples
examples/message_transport_shm.rs (line 51)
46async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
47 println!("[Server] Creating shared memory transport");
48
49 let config = SharedMemoryConfig::default();
50 let transport = SharedMemoryFrameTransport::create_server(SERVICE_NAME, config)?;
51 let channel = MessageChannelAdapter::new(transport);
52
53 println!("[Server] Waiting for messages");
54
55 loop {
56 let message = channel.recv().await?;
57
58 match message.msg_type {
59 MessageType::Call => {
60 println!(
61 "[Server] Received Call: method={}, id={}",
62 message.method, message.id
63 );
64
65 match message.method.as_str() {
66 "add" => {
67 let req: AddRequest = message.deserialize_payload()?;
68 println!("[Server] AddRequest: {} + {}", req.a, req.b);
69
70 let resp = AddResponse {
71 result: req.a + req.b,
72 };
73 let reply = Message::reply(message.id, resp)?;
74 channel.send(&reply).await?;
75 println!("[Server] Sent reply");
76 }
77 "divide" => {
78 let req: AddRequest = message.deserialize_payload()?;
79 if req.b == 0 {
80 let error = Message::error(message.id, "Division by zero");
81 channel.send(&error).await?;
82 println!("[Server] Sent error: division by zero");
83 } else {
84 let resp = AddResponse {
85 result: req.a / req.b,
86 };
87 let reply = Message::reply(message.id, resp)?;
88 channel.send(&reply).await?;
89 }
90 }
91 "shutdown" => {
92 println!("[Server] Shutdown requested");
93 let reply = Message::reply(message.id, "ok")?;
94 channel.send(&reply).await?;
95 break;
96 }
97 _ => {
98 let error = Message::error(message.id, "Unknown method");
99 channel.send(&error).await?;
100 }
101 }
102 }
103 MessageType::Notification => {
104 println!("[Server] Received Notification: method={}", message.method);
105 if message.method == "log" {
106 let event: LogEvent = message.deserialize_payload()?;
107 println!("[Server] Log [{}]: {}", event.level, event.message);
108 }
109 }
110 _ => {
111 println!("[Server] Unknown message type: {:?}", message.msg_type);
112 }
113 }
114 }
115
116 println!("[Server] Shutting down");
117 Ok(())
118}
119
120async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
121 println!("[Client] Connecting to shared memory transport");
122
123 let transport = SharedMemoryFrameTransport::connect_client(SERVICE_NAME)?;
124 let channel = MessageChannelAdapter::new(transport);
125
126 println!("[Client] Connected!");
127
128 // Call: add (no compression)
129 println!("\n[Client] Calling add (10 + 32)");
130 let call = Message::call("add", AddRequest { a: 10, b: 32 })?;
131 channel.send(&call).await?;
132
133 let reply = channel.recv().await?;
134 let resp: AddResponse = reply.deserialize_payload()?;
135 println!("[Client] Result: {}", resp.result);
136
137 // Call: add with LZ4 compression
138 println!("\n[Client] Calling add with LZ4 compression (100 + 200)");
139 let mut call = Message::call("add", AddRequest { a: 100, b: 200 })?;
140 call.metadata.compression = CompressionType::Lz4;
141 channel.send(&call).await?;
142
143 let reply = channel.recv().await?;
144 let resp: AddResponse = reply.deserialize_payload()?;
145 println!(
146 "[Client] Result: {} (compression: {:?})",
147 resp.result, reply.metadata.compression
148 );
149
150 // Call: divide by zero (expect error)
151 println!("\n[Client] Calling divide (10 / 0)");
152 let call = Message::call("divide", AddRequest { a: 10, b: 0 })?;
153 channel.send(&call).await?;
154
155 let reply = channel.recv().await?;
156 if reply.msg_type == MessageType::Error {
157 let error_msg: String = reply.deserialize_payload()?;
158 println!("[Client] Got error: {}", error_msg);
159 }
160
161 // Notification (fire-and-forget)
162 println!("\n[Client] Sending notification log");
163 let notification = Message::notification(
164 "log",
165 LogEvent {
166 level: "INFO".to_string(),
167 message: "Client started successfully".to_string(),
168 },
169 )?;
170 channel.send(¬ification).await?;
171 println!("[Client] Notification sent (no response expected)");
172
173 // Shutdown
174 println!("\n[Client] Sending shutdown...");
175 let call = Message::call("shutdown", ())?;
176 channel.send(&call).await?;
177 let _ = channel.recv().await?;
178 println!("[Client] Done!");
179
180 Ok(())
181}Source§impl<F: FrameTransport, C: Codec> MessageChannelAdapter<F, C>
impl<F: FrameTransport, C: Codec> MessageChannelAdapter<F, C>
Sourcepub fn with_codec(transport: F) -> Self
pub fn with_codec(transport: F) -> Self
Create a new adapter with a specific codec.
Sourcepub fn into_inner(self) -> F
pub fn into_inner(self) -> F
Consume the adapter and return the inner transport.
Trait Implementations§
Source§impl<F: Debug + FrameTransport, C: Debug + Codec> Debug for MessageChannelAdapter<F, C>
impl<F: Debug + FrameTransport, C: Debug + Codec> Debug for MessageChannelAdapter<F, C>
Source§impl<F: FrameTransport, C: Codec + Default> MessageChannel<C> for MessageChannelAdapter<F, C>
impl<F: FrameTransport, C: Codec + Default> MessageChannel<C> for MessageChannelAdapter<F, C>
Source§fn send<'life0, 'life1, 'async_trait>(
&'life0 self,
message: &'life1 Message<C>,
) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn send<'life0, 'life1, 'async_trait>(
&'life0 self,
message: &'life1 Message<C>,
) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Send an RPC message.
Source§fn recv<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = TransportResult<Message<C>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn recv<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = TransportResult<Message<C>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Receive an RPC message.
Source§fn is_connected(&self) -> bool
fn is_connected(&self) -> bool
Check if the channel is connected.
Source§fn is_healthy(&self) -> bool
fn is_healthy(&self) -> bool
Check if the channel is healthy.
Source§fn close<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn close<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Close the channel.
Source§fn stats(&self) -> Option<TransportStats>
fn stats(&self) -> Option<TransportStats>
Get transport statistics.
Auto Trait Implementations§
impl<F, C> Freeze for MessageChannelAdapter<F, C>where
F: Freeze,
impl<F, C> RefUnwindSafe for MessageChannelAdapter<F, C>where
F: RefUnwindSafe,
C: RefUnwindSafe,
impl<F, C> Send for MessageChannelAdapter<F, C>
impl<F, C> Sync for MessageChannelAdapter<F, C>
impl<F, C> Unpin for MessageChannelAdapter<F, C>
impl<F, C> UnwindSafe for MessageChannelAdapter<F, C>where
F: UnwindSafe,
C: UnwindSafe,
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more