1mod connection;
18mod push_client;
19
20pub use push_client::{PushClient, PushedFrame};
21
22use alloc::format;
23use alloc::string::ToString;
24use alloc::sync::Arc;
25use alloc::vec::Vec;
26use core::fmt;
27
28use liminal::protocol::{
29 CausalContext, Frame, MessageEnvelope, PUBLISH_DELIVERED_FLAG, PUBLISH_IDEMPOTENCY_KEY_FLAG,
30 SchemaId,
31};
32use spin::Mutex;
33
34use crate::{DeliveryAck, PressureResponse, SdkError};
35
36use self::connection::{Connection, unexpected_frame};
37use super::ServerAddress;
38use super::protocol::{
39 RemoteTransport, WireConversationRequest, WirePublishRequest, WireResumeRequest,
40 WireSubscribeRequest,
41};
42
43const APPLICATION_STREAM_ID: u32 = 1;
45const DEFAULT_MAX_IN_FLIGHT: u32 = 1;
47const SCHEMALESS_SCHEMA: &[u8] = &[];
49
50pub struct TcpRemoteTransport {
52 connection: Arc<Mutex<Connection>>,
53}
54
55impl fmt::Debug for TcpRemoteTransport {
56 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
57 formatter
58 .debug_struct("TcpRemoteTransport")
59 .finish_non_exhaustive()
60 }
61}
62
63impl TcpRemoteTransport {
64 pub fn connect(server_address: &ServerAddress) -> Result<Self, SdkError> {
72 let connection = Connection::connect(server_address.as_str())?;
73 Ok(Self {
74 connection: Arc::new(Mutex::new(connection)),
75 })
76 }
77
78 fn round_trip(&self, request: &Frame) -> Result<Frame, SdkError> {
79 let mut connection = self.connection.lock();
80 connection.round_trip(request)
81 }
82}
83
84impl RemoteTransport for TcpRemoteTransport {
85 fn publish(
86 &self,
87 _server_address: &ServerAddress,
88 request: &WirePublishRequest,
89 ) -> Result<PressureResponse, SdkError> {
90 let frame = build_publish_frame(request);
91 let response = self.round_trip(&frame)?;
92 publish_response(response)
93 }
94
95 fn publish_with_delivery(
96 &self,
97 _server_address: &ServerAddress,
98 request: &WirePublishRequest,
99 ) -> Result<DeliveryAck, SdkError> {
100 let frame = build_publish_frame(request);
101 let response = self.round_trip(&frame)?;
102 publish_delivery_response(response)
103 }
104
105 fn subscribe(
106 &self,
107 _server_address: &ServerAddress,
108 request: &WireSubscribeRequest,
109 ) -> Result<(), SdkError> {
110 let frame = Frame::Subscribe {
111 flags: 0,
112 stream_id: request.stream_id(),
113 channel: request.channel().to_string(),
114 accepted_schemas: Vec::new(),
117 max_in_flight: DEFAULT_MAX_IN_FLIGHT,
118 };
119 let response = self.round_trip(&frame)?;
120 subscribe_response(response)
121 }
122
123 fn send_conversation(
124 &self,
125 _server_address: &ServerAddress,
126 request: &WireConversationRequest,
127 ) -> Result<(), SdkError> {
128 let conversation_label = request.conversation_id().as_str();
129 let conversation_id = conversation_wire_id(conversation_label);
130 let envelope = build_envelope(SCHEMALESS_SCHEMA, request.payload());
131 let mut connection = self.connection.lock();
132 connection.send_conversation_message(conversation_id, conversation_label, envelope)
133 }
134
135 fn request_reply_conversation(
136 &self,
137 _server_address: &ServerAddress,
138 request: &WireConversationRequest,
139 ) -> Result<Vec<u8>, SdkError> {
140 let conversation_label = request.conversation_id().as_str();
141 let conversation_id = conversation_wire_id(conversation_label);
142 let envelope = build_envelope(SCHEMALESS_SCHEMA, request.payload());
143 let mut connection = self.connection.lock();
144 connection.conversation_request_reply(conversation_id, conversation_label, envelope)
145 }
146
147 fn resume(
148 &self,
149 _server_address: &ServerAddress,
150 request: &WireResumeRequest,
151 ) -> Result<(), SdkError> {
152 let _ = (request.subscription_id(), request.resume_from_sequence());
159 Err(SdkError::Protocol {
160 description:
161 "resume is not yet supported over the TCP transport; re-subscribe to trigger \
162 server replay"
163 .to_string(),
164 })
165 }
166}
167
168fn build_envelope(schema_bytes: &[u8], payload: &[u8]) -> MessageEnvelope {
169 MessageEnvelope::new(
170 schema_id_from_bytes(schema_bytes),
171 CausalContext::independent(),
172 payload.to_vec(),
173 )
174}
175
176fn schema_id_from_bytes(schema_bytes: &[u8]) -> SchemaId {
182 let mut id = [0_u8; SchemaId::WIRE_LEN];
183 let mut hash = fnv1a(schema_bytes).to_be_bytes();
184 for (index, slot) in id.iter_mut().enumerate() {
186 *slot = hash[index % hash.len()];
187 if index % hash.len() == hash.len() - 1 {
188 hash = fnv1a(&hash).to_be_bytes();
189 }
190 }
191 SchemaId::new(id)
192}
193
194fn conversation_wire_id(conversation_id: &str) -> u64 {
195 fnv1a(conversation_id.as_bytes())
196}
197
198fn fnv1a(bytes: &[u8]) -> u64 {
200 const OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325;
201 const PRIME: u64 = 0x0000_0100_0000_01b3;
202 let mut hash = OFFSET_BASIS;
203 for byte in bytes {
204 hash ^= u64::from(*byte);
205 hash = hash.wrapping_mul(PRIME);
206 }
207 hash
208}
209
210fn build_publish_frame(request: &WirePublishRequest) -> Frame {
214 let envelope = build_envelope(request.schema().schema.as_ref(), request.payload());
215 let flags = match request.idempotency_key() {
216 Some(_) => PUBLISH_IDEMPOTENCY_KEY_FLAG,
217 None => 0,
218 };
219 Frame::Publish {
220 flags,
221 stream_id: APPLICATION_STREAM_ID,
222 channel: request.channel().to_string(),
223 envelope,
224 idempotency_key: request.idempotency_key().map(ToString::to_string),
225 }
226}
227
228fn publish_response(frame: Frame) -> Result<PressureResponse, SdkError> {
229 match frame {
230 Frame::PublishAck { .. } => Ok(PressureResponse::Accept),
231 Frame::PublishError {
232 reason_code,
233 message,
234 ..
235 } => Err(SdkError::Backpressure {
236 reason: format!(
237 "server rejected publish (reason {reason_code}): {}",
238 message.unwrap_or_else(|| "no detail".to_string())
239 ),
240 }),
241 other => Err(unexpected_frame("PublishAck", &other)),
242 }
243}
244
245fn publish_delivery_response(frame: Frame) -> Result<DeliveryAck, SdkError> {
248 match frame {
249 Frame::PublishAck { flags, .. } => {
250 let accepted = flags & PUBLISH_DELIVERED_FLAG != 0;
251 Ok(DeliveryAck::new(PressureResponse::Accept, accepted))
252 }
253 Frame::PublishError {
254 reason_code,
255 message,
256 ..
257 } => Err(SdkError::Backpressure {
258 reason: format!(
259 "server rejected publish (reason {reason_code}): {}",
260 message.unwrap_or_else(|| "no detail".to_string())
261 ),
262 }),
263 other => Err(unexpected_frame("PublishAck", &other)),
264 }
265}
266
267fn subscribe_response(frame: Frame) -> Result<(), SdkError> {
268 match frame {
269 Frame::SubscribeAck { .. } => Ok(()),
270 Frame::SubscribeError {
271 reason_code,
272 message,
273 ..
274 } => Err(SdkError::Protocol {
275 description: format!(
276 "server rejected subscribe (reason {reason_code}): {}",
277 message.unwrap_or_else(|| "no detail".to_string())
278 ),
279 }),
280 other => Err(unexpected_frame("SubscribeAck", &other)),
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn schema_ids_are_deterministic_and_distinct() {
290 assert_eq!(schema_id_from_bytes(b"a"), schema_id_from_bytes(b"a"));
291 assert_ne!(schema_id_from_bytes(b"a"), schema_id_from_bytes(b"b"));
292 }
293
294 #[test]
295 fn conversation_ids_are_stable() {
296 assert_eq!(conversation_wire_id("chat"), conversation_wire_id("chat"));
297 assert_ne!(conversation_wire_id("chat"), conversation_wire_id("other"));
298 }
299
300 #[test]
301 fn publish_ack_maps_to_accept() -> Result<(), SdkError> {
302 let frame = Frame::PublishAck {
303 flags: 0,
304 stream_id: 1,
305 message_id: 7,
306 };
307 assert_eq!(publish_response(frame)?, PressureResponse::Accept);
308 Ok(())
309 }
310
311 #[test]
312 fn publish_error_maps_to_backpressure() {
313 let frame = Frame::PublishError {
314 flags: 0,
315 stream_id: 1,
316 reason_code: 9,
317 message: Some("nope".to_string()),
318 };
319 assert!(matches!(
320 publish_response(frame),
321 Err(SdkError::Backpressure { .. })
322 ));
323 }
324}