pub struct Message<C: Codec = BincodeCodec> {
pub id: MessageId,
pub msg_type: MessageType,
pub method: String,
pub payload: Bytes,
pub metadata: MessageMetadata,
pub codec: C,
}Fields§
§id: MessageId§msg_type: MessageType§method: String§payload: Bytes§metadata: MessageMetadata§codec: CImplementations§
Source§impl<C: Codec + Default> Message<C>
impl<C: Codec + Default> Message<C>
pub fn new( id: MessageId, msg_type: MessageType, method: impl Into<String>, payload: Bytes, metadata: MessageMetadata, ) -> Self
Sourcepub fn call<T: Serialize>(method: impl Into<String>, request: T) -> Result<Self>
pub fn call<T: Serialize>(method: impl Into<String>, request: T) -> Result<Self>
Examples found in repository?
examples/message_transport_shm.rs (line 130)
120async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
121 println!("[Client] Connecting to shared memory transport");
122
123 let transport = SharedMemoryFrameTransport::connect_client(SERVICE_NAME)?;
124 let channel = MessageChannelAdapter::new(transport);
125
126 println!("[Client] Connected!");
127
128 // Call: add (no compression)
129 println!("\n[Client] Calling add (10 + 32)");
130 let call = Message::call("add", AddRequest { a: 10, b: 32 })?;
131 channel.send(&call).await?;
132
133 let reply = channel.recv().await?;
134 let resp: AddResponse = reply.deserialize_payload()?;
135 println!("[Client] Result: {}", resp.result);
136
137 // Call: add with LZ4 compression
138 println!("\n[Client] Calling add with LZ4 compression (100 + 200)");
139 let mut call = Message::call("add", AddRequest { a: 100, b: 200 })?;
140 call.metadata.compression = CompressionType::Lz4;
141 channel.send(&call).await?;
142
143 let reply = channel.recv().await?;
144 let resp: AddResponse = reply.deserialize_payload()?;
145 println!(
146 "[Client] Result: {} (compression: {:?})",
147 resp.result, reply.metadata.compression
148 );
149
150 // Call: divide by zero (expect error)
151 println!("\n[Client] Calling divide (10 / 0)");
152 let call = Message::call("divide", AddRequest { a: 10, b: 0 })?;
153 channel.send(&call).await?;
154
155 let reply = channel.recv().await?;
156 if reply.msg_type == MessageType::Error {
157 let error_msg: String = reply.deserialize_payload()?;
158 println!("[Client] Got error: {}", error_msg);
159 }
160
161 // Notification (fire-and-forget)
162 println!("\n[Client] Sending notification log");
163 let notification = Message::notification(
164 "log",
165 LogEvent {
166 level: "INFO".to_string(),
167 message: "Client started successfully".to_string(),
168 },
169 )?;
170 channel.send(¬ification).await?;
171 println!("[Client] Notification sent (no response expected)");
172
173 // Shutdown
174 println!("\n[Client] Sending shutdown...");
175 let call = Message::call("shutdown", ())?;
176 channel.send(&call).await?;
177 let _ = channel.recv().await?;
178 println!("[Client] Done!");
179
180 Ok(())
181}Sourcepub fn reply<T: Serialize>(id: MessageId, response: T) -> Result<Self>
pub fn reply<T: Serialize>(id: MessageId, response: T) -> Result<Self>
Examples found in repository?
examples/message_transport_shm.rs (line 73)
46async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
47 println!("[Server] Creating shared memory transport");
48
49 let config = SharedMemoryConfig::default();
50 let transport = SharedMemoryFrameTransport::create_server(SERVICE_NAME, config)?;
51 let channel = MessageChannelAdapter::new(transport);
52
53 println!("[Server] Waiting for messages");
54
55 loop {
56 let message = channel.recv().await?;
57
58 match message.msg_type {
59 MessageType::Call => {
60 println!(
61 "[Server] Received Call: method={}, id={}",
62 message.method, message.id
63 );
64
65 match message.method.as_str() {
66 "add" => {
67 let req: AddRequest = message.deserialize_payload()?;
68 println!("[Server] AddRequest: {} + {}", req.a, req.b);
69
70 let resp = AddResponse {
71 result: req.a + req.b,
72 };
73 let reply = Message::reply(message.id, resp)?;
74 channel.send(&reply).await?;
75 println!("[Server] Sent reply");
76 }
77 "divide" => {
78 let req: AddRequest = message.deserialize_payload()?;
79 if req.b == 0 {
80 let error = Message::error(message.id, "Division by zero");
81 channel.send(&error).await?;
82 println!("[Server] Sent error: division by zero");
83 } else {
84 let resp = AddResponse {
85 result: req.a / req.b,
86 };
87 let reply = Message::reply(message.id, resp)?;
88 channel.send(&reply).await?;
89 }
90 }
91 "shutdown" => {
92 println!("[Server] Shutdown requested");
93 let reply = Message::reply(message.id, "ok")?;
94 channel.send(&reply).await?;
95 break;
96 }
97 _ => {
98 let error = Message::error(message.id, "Unknown method");
99 channel.send(&error).await?;
100 }
101 }
102 }
103 MessageType::Notification => {
104 println!("[Server] Received Notification: method={}", message.method);
105 if message.method == "log" {
106 let event: LogEvent = message.deserialize_payload()?;
107 println!("[Server] Log [{}]: {}", event.level, event.message);
108 }
109 }
110 _ => {
111 println!("[Server] Unknown message type: {:?}", message.msg_type);
112 }
113 }
114 }
115
116 println!("[Server] Shutting down");
117 Ok(())
118}Sourcepub fn notification<T: Serialize>(
method: impl Into<String>,
data: T,
) -> Result<Self>
pub fn notification<T: Serialize>( method: impl Into<String>, data: T, ) -> Result<Self>
Examples found in repository?
examples/message_transport_shm.rs (lines 163-169)
120async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
121 println!("[Client] Connecting to shared memory transport");
122
123 let transport = SharedMemoryFrameTransport::connect_client(SERVICE_NAME)?;
124 let channel = MessageChannelAdapter::new(transport);
125
126 println!("[Client] Connected!");
127
128 // Call: add (no compression)
129 println!("\n[Client] Calling add (10 + 32)");
130 let call = Message::call("add", AddRequest { a: 10, b: 32 })?;
131 channel.send(&call).await?;
132
133 let reply = channel.recv().await?;
134 let resp: AddResponse = reply.deserialize_payload()?;
135 println!("[Client] Result: {}", resp.result);
136
137 // Call: add with LZ4 compression
138 println!("\n[Client] Calling add with LZ4 compression (100 + 200)");
139 let mut call = Message::call("add", AddRequest { a: 100, b: 200 })?;
140 call.metadata.compression = CompressionType::Lz4;
141 channel.send(&call).await?;
142
143 let reply = channel.recv().await?;
144 let resp: AddResponse = reply.deserialize_payload()?;
145 println!(
146 "[Client] Result: {} (compression: {:?})",
147 resp.result, reply.metadata.compression
148 );
149
150 // Call: divide by zero (expect error)
151 println!("\n[Client] Calling divide (10 / 0)");
152 let call = Message::call("divide", AddRequest { a: 10, b: 0 })?;
153 channel.send(&call).await?;
154
155 let reply = channel.recv().await?;
156 if reply.msg_type == MessageType::Error {
157 let error_msg: String = reply.deserialize_payload()?;
158 println!("[Client] Got error: {}", error_msg);
159 }
160
161 // Notification (fire-and-forget)
162 println!("\n[Client] Sending notification log");
163 let notification = Message::notification(
164 "log",
165 LogEvent {
166 level: "INFO".to_string(),
167 message: "Client started successfully".to_string(),
168 },
169 )?;
170 channel.send(¬ification).await?;
171 println!("[Client] Notification sent (no response expected)");
172
173 // Shutdown
174 println!("\n[Client] Sending shutdown...");
175 let call = Message::call("shutdown", ())?;
176 channel.send(&call).await?;
177 let _ = channel.recv().await?;
178 println!("[Client] Done!");
179
180 Ok(())
181}Sourcepub fn error(id: MessageId, error_msg: impl Into<String>) -> Self
pub fn error(id: MessageId, error_msg: impl Into<String>) -> Self
Examples found in repository?
examples/message_transport_shm.rs (line 80)
46async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
47 println!("[Server] Creating shared memory transport");
48
49 let config = SharedMemoryConfig::default();
50 let transport = SharedMemoryFrameTransport::create_server(SERVICE_NAME, config)?;
51 let channel = MessageChannelAdapter::new(transport);
52
53 println!("[Server] Waiting for messages");
54
55 loop {
56 let message = channel.recv().await?;
57
58 match message.msg_type {
59 MessageType::Call => {
60 println!(
61 "[Server] Received Call: method={}, id={}",
62 message.method, message.id
63 );
64
65 match message.method.as_str() {
66 "add" => {
67 let req: AddRequest = message.deserialize_payload()?;
68 println!("[Server] AddRequest: {} + {}", req.a, req.b);
69
70 let resp = AddResponse {
71 result: req.a + req.b,
72 };
73 let reply = Message::reply(message.id, resp)?;
74 channel.send(&reply).await?;
75 println!("[Server] Sent reply");
76 }
77 "divide" => {
78 let req: AddRequest = message.deserialize_payload()?;
79 if req.b == 0 {
80 let error = Message::error(message.id, "Division by zero");
81 channel.send(&error).await?;
82 println!("[Server] Sent error: division by zero");
83 } else {
84 let resp = AddResponse {
85 result: req.a / req.b,
86 };
87 let reply = Message::reply(message.id, resp)?;
88 channel.send(&reply).await?;
89 }
90 }
91 "shutdown" => {
92 println!("[Server] Shutdown requested");
93 let reply = Message::reply(message.id, "ok")?;
94 channel.send(&reply).await?;
95 break;
96 }
97 _ => {
98 let error = Message::error(message.id, "Unknown method");
99 channel.send(&error).await?;
100 }
101 }
102 }
103 MessageType::Notification => {
104 println!("[Server] Received Notification: method={}", message.method);
105 if message.method == "log" {
106 let event: LogEvent = message.deserialize_payload()?;
107 println!("[Server] Log [{}]: {}", event.level, event.message);
108 }
109 }
110 _ => {
111 println!("[Server] Unknown message type: {:?}", message.msg_type);
112 }
113 }
114 }
115
116 println!("[Server] Shutting down");
117 Ok(())
118}Sourcepub fn stream_error(
id: MessageId,
stream_id: u64,
error_msg: impl Into<String>,
) -> Self
pub fn stream_error( id: MessageId, stream_id: u64, error_msg: impl Into<String>, ) -> Self
Create an error message with stream_id for stream call failures
Sourcepub fn stream_chunk<T: Serialize>(
stream_id: u64,
sequence: u64,
data: T,
) -> Result<Self>
pub fn stream_chunk<T: Serialize>( stream_id: u64, sequence: u64, data: T, ) -> Result<Self>
Create a stream chunk message
Sourcepub fn stream_end(stream_id: u64) -> Self
pub fn stream_end(stream_id: u64) -> Self
Create a stream end message
Sourcepub fn decode(buf: impl Buf) -> TransportResult<Self>
pub fn decode(buf: impl Buf) -> TransportResult<Self>
Decode message from wire bytes
Source§impl<C: Codec> Message<C>
impl<C: Codec> Message<C>
Sourcepub fn encode(&self) -> TransportResult<BytesMut>
pub fn encode(&self) -> TransportResult<BytesMut>
Encode message to bytes
Sourcepub fn deserialize_payload<T: for<'de> Deserialize<'de>>(&self) -> Result<T>
pub fn deserialize_payload<T: for<'de> Deserialize<'de>>(&self) -> Result<T>
Examples found in repository?
examples/message_transport_shm.rs (line 67)
46async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
47 println!("[Server] Creating shared memory transport");
48
49 let config = SharedMemoryConfig::default();
50 let transport = SharedMemoryFrameTransport::create_server(SERVICE_NAME, config)?;
51 let channel = MessageChannelAdapter::new(transport);
52
53 println!("[Server] Waiting for messages");
54
55 loop {
56 let message = channel.recv().await?;
57
58 match message.msg_type {
59 MessageType::Call => {
60 println!(
61 "[Server] Received Call: method={}, id={}",
62 message.method, message.id
63 );
64
65 match message.method.as_str() {
66 "add" => {
67 let req: AddRequest = message.deserialize_payload()?;
68 println!("[Server] AddRequest: {} + {}", req.a, req.b);
69
70 let resp = AddResponse {
71 result: req.a + req.b,
72 };
73 let reply = Message::reply(message.id, resp)?;
74 channel.send(&reply).await?;
75 println!("[Server] Sent reply");
76 }
77 "divide" => {
78 let req: AddRequest = message.deserialize_payload()?;
79 if req.b == 0 {
80 let error = Message::error(message.id, "Division by zero");
81 channel.send(&error).await?;
82 println!("[Server] Sent error: division by zero");
83 } else {
84 let resp = AddResponse {
85 result: req.a / req.b,
86 };
87 let reply = Message::reply(message.id, resp)?;
88 channel.send(&reply).await?;
89 }
90 }
91 "shutdown" => {
92 println!("[Server] Shutdown requested");
93 let reply = Message::reply(message.id, "ok")?;
94 channel.send(&reply).await?;
95 break;
96 }
97 _ => {
98 let error = Message::error(message.id, "Unknown method");
99 channel.send(&error).await?;
100 }
101 }
102 }
103 MessageType::Notification => {
104 println!("[Server] Received Notification: method={}", message.method);
105 if message.method == "log" {
106 let event: LogEvent = message.deserialize_payload()?;
107 println!("[Server] Log [{}]: {}", event.level, event.message);
108 }
109 }
110 _ => {
111 println!("[Server] Unknown message type: {:?}", message.msg_type);
112 }
113 }
114 }
115
116 println!("[Server] Shutting down");
117 Ok(())
118}
119
120async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
121 println!("[Client] Connecting to shared memory transport");
122
123 let transport = SharedMemoryFrameTransport::connect_client(SERVICE_NAME)?;
124 let channel = MessageChannelAdapter::new(transport);
125
126 println!("[Client] Connected!");
127
128 // Call: add (no compression)
129 println!("\n[Client] Calling add (10 + 32)");
130 let call = Message::call("add", AddRequest { a: 10, b: 32 })?;
131 channel.send(&call).await?;
132
133 let reply = channel.recv().await?;
134 let resp: AddResponse = reply.deserialize_payload()?;
135 println!("[Client] Result: {}", resp.result);
136
137 // Call: add with LZ4 compression
138 println!("\n[Client] Calling add with LZ4 compression (100 + 200)");
139 let mut call = Message::call("add", AddRequest { a: 100, b: 200 })?;
140 call.metadata.compression = CompressionType::Lz4;
141 channel.send(&call).await?;
142
143 let reply = channel.recv().await?;
144 let resp: AddResponse = reply.deserialize_payload()?;
145 println!(
146 "[Client] Result: {} (compression: {:?})",
147 resp.result, reply.metadata.compression
148 );
149
150 // Call: divide by zero (expect error)
151 println!("\n[Client] Calling divide (10 / 0)");
152 let call = Message::call("divide", AddRequest { a: 10, b: 0 })?;
153 channel.send(&call).await?;
154
155 let reply = channel.recv().await?;
156 if reply.msg_type == MessageType::Error {
157 let error_msg: String = reply.deserialize_payload()?;
158 println!("[Client] Got error: {}", error_msg);
159 }
160
161 // Notification (fire-and-forget)
162 println!("\n[Client] Sending notification log");
163 let notification = Message::notification(
164 "log",
165 LogEvent {
166 level: "INFO".to_string(),
167 message: "Client started successfully".to_string(),
168 },
169 )?;
170 channel.send(¬ification).await?;
171 println!("[Client] Notification sent (no response expected)");
172
173 // Shutdown
174 println!("\n[Client] Sending shutdown...");
175 let call = Message::call("shutdown", ())?;
176 channel.send(&call).await?;
177 let _ = channel.recv().await?;
178 println!("[Client] Done!");
179
180 Ok(())
181}Trait Implementations§
Auto Trait Implementations§
impl<C = BincodeCodec> !Freeze for Message<C>
impl<C> RefUnwindSafe for Message<C>where
C: RefUnwindSafe,
impl<C> Send for Message<C>
impl<C> Sync for Message<C>
impl<C> Unpin for Message<C>where
C: Unpin,
impl<C> UnwindSafe for Message<C>where
C: UnwindSafe,
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§unsafe fn clone_to_uninit(&self, dest: *mut u8)
unsafe fn clone_to_uninit(&self, dest: *mut u8)
🔬This is a nightly-only experimental API. (
clone_to_uninit)