Skip to main content

liminal_sdk/remote/tcp/
mod.rs

1//! Real TCP transport for the remote SDK.
2//!
3//! Unlike [`ProtocolRemoteTransport`](super::protocol::ProtocolRemoteTransport),
4//! which only exercises the SDK's framing in-process, this transport opens a real
5//! `TcpStream` to a running `liminal-server`, performs the protocol handshake, and
6//! exchanges canonical wire frames over the socket.
7//!
8//! # Blocking model
9//!
10//! The SDK API surface is synchronous: [`RemoteTransport`] methods return plain
11//! `Result` values, and the rest of the SDK (connection pool, lifecycle) is
12//! driven by ordinary blocking calls. This transport therefore uses
13//! `std::net::TcpStream` in blocking mode with explicit read/write timeouts; it
14//! does not introduce an async runtime. Each transport call holds a short-lived
15//! connection lock for the duration of one request/response exchange.
16
17mod connection;
18mod push_client;
19
20pub use push_client::{PushClient, PushedFrame};
21
22use alloc::format;
23use alloc::string::ToString;
24use alloc::sync::Arc;
25use alloc::vec::Vec;
26use core::fmt;
27
28use liminal::protocol::{
29    CausalContext, Frame, MessageEnvelope, PUBLISH_DELIVERED_FLAG, PUBLISH_IDEMPOTENCY_KEY_FLAG,
30    SchemaId,
31};
32use spin::Mutex;
33
34use crate::{DeliveryAck, PressureResponse, SdkError};
35
36use self::connection::{Connection, unexpected_frame};
37use super::ServerAddress;
38use super::protocol::{
39    RemoteTransport, WireConversationRequest, WirePublishRequest, WireResumeRequest,
40    WireSubscribeRequest,
41};
42
43/// Application stream id used for non-subscription application frames.
44const APPLICATION_STREAM_ID: u32 = 1;
45/// In-flight credit advertised on subscribe; one keeps strict pacing.
46const DEFAULT_MAX_IN_FLIGHT: u32 = 1;
47/// Schema id used for payloads whose schema is not carried on the wire.
48const SCHEMALESS_SCHEMA: &[u8] = &[];
49
50/// Real TCP transport that exchanges canonical wire frames with a liminal server.
51pub struct TcpRemoteTransport {
52    connection: Arc<Mutex<Connection>>,
53}
54
55impl fmt::Debug for TcpRemoteTransport {
56    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
57        formatter
58            .debug_struct("TcpRemoteTransport")
59            .finish_non_exhaustive()
60    }
61}
62
63impl TcpRemoteTransport {
64    /// Connects to `server_address`, completes the handshake, and returns a ready transport.
65    ///
66    /// # Errors
67    ///
68    /// Returns [`SdkError::Connection`] when the TCP connection cannot be
69    /// established, and [`SdkError::Protocol`] when the handshake frames cannot be
70    /// encoded, sent, or are rejected by the server.
71    pub fn connect(server_address: &ServerAddress) -> Result<Self, SdkError> {
72        let connection = Connection::connect(server_address.as_str())?;
73        Ok(Self {
74            connection: Arc::new(Mutex::new(connection)),
75        })
76    }
77
78    fn round_trip(&self, request: &Frame) -> Result<Frame, SdkError> {
79        let mut connection = self.connection.lock();
80        connection.round_trip(request)
81    }
82}
83
84impl RemoteTransport for TcpRemoteTransport {
85    fn publish(
86        &self,
87        _server_address: &ServerAddress,
88        request: &WirePublishRequest,
89    ) -> Result<PressureResponse, SdkError> {
90        let frame = build_publish_frame(request);
91        let response = self.round_trip(&frame)?;
92        publish_response(response)
93    }
94
95    fn publish_with_delivery(
96        &self,
97        _server_address: &ServerAddress,
98        request: &WirePublishRequest,
99    ) -> Result<DeliveryAck, SdkError> {
100        let frame = build_publish_frame(request);
101        let response = self.round_trip(&frame)?;
102        publish_delivery_response(response)
103    }
104
105    fn subscribe(
106        &self,
107        _server_address: &ServerAddress,
108        request: &WireSubscribeRequest,
109    ) -> Result<(), SdkError> {
110        let frame = Frame::Subscribe {
111            flags: 0,
112            stream_id: request.stream_id(),
113            channel: request.channel().to_string(),
114            // An empty accepted-schema list lets the server select the channel's
115            // configured schema, mirroring the server's negotiation contract.
116            accepted_schemas: Vec::new(),
117            max_in_flight: DEFAULT_MAX_IN_FLIGHT,
118        };
119        let response = self.round_trip(&frame)?;
120        subscribe_response(response)
121    }
122
123    fn send_conversation(
124        &self,
125        _server_address: &ServerAddress,
126        request: &WireConversationRequest,
127    ) -> Result<(), SdkError> {
128        let conversation_label = request.conversation_id().as_str();
129        let conversation_id = conversation_wire_id(conversation_label);
130        let envelope = build_envelope(SCHEMALESS_SCHEMA, request.payload());
131        let mut connection = self.connection.lock();
132        connection.send_conversation_message(conversation_id, conversation_label, envelope)
133    }
134
135    fn request_reply_conversation(
136        &self,
137        _server_address: &ServerAddress,
138        request: &WireConversationRequest,
139    ) -> Result<Vec<u8>, SdkError> {
140        let conversation_label = request.conversation_id().as_str();
141        let conversation_id = conversation_wire_id(conversation_label);
142        let envelope = build_envelope(SCHEMALESS_SCHEMA, request.payload());
143        let mut connection = self.connection.lock();
144        connection.conversation_request_reply(conversation_id, conversation_label, envelope)
145    }
146
147    fn resume(
148        &self,
149        _server_address: &ServerAddress,
150        request: &WireResumeRequest,
151    ) -> Result<(), SdkError> {
152        // The wire protocol has no resume frame: the server replays a subscription
153        // from its durable log only when the SDK re-issues the Subscribe for that
154        // stream on reconnect. This transport does not retain the channel/stream
155        // mapping needed to re-drive that Subscribe here, so it cannot honour the
156        // resume over the socket. Returning a clear error keeps the contract honest
157        // rather than reporting success while dropping the user's resume intent.
158        let _ = (request.subscription_id(), request.resume_from_sequence());
159        Err(SdkError::Protocol {
160            description:
161                "resume is not yet supported over the TCP transport; re-subscribe to trigger \
162                 server replay"
163                    .to_string(),
164        })
165    }
166}
167
168fn build_envelope(schema_bytes: &[u8], payload: &[u8]) -> MessageEnvelope {
169    MessageEnvelope::new(
170        schema_id_from_bytes(schema_bytes),
171        CausalContext::independent(),
172        payload.to_vec(),
173    )
174}
175
176/// Derives a stable 32-byte schema id from arbitrary schema bytes via FNV-1a.
177///
178/// The server selects the channel's configured schema on subscribe and stores the
179/// published envelope verbatim, so this id only needs to be deterministic, not a
180/// negotiated value.
181fn schema_id_from_bytes(schema_bytes: &[u8]) -> SchemaId {
182    let mut id = [0_u8; SchemaId::WIRE_LEN];
183    let mut hash = fnv1a(schema_bytes).to_be_bytes();
184    // Spread the 8-byte digest across the 32-byte id deterministically.
185    for (index, slot) in id.iter_mut().enumerate() {
186        *slot = hash[index % hash.len()];
187        if index % hash.len() == hash.len() - 1 {
188            hash = fnv1a(&hash).to_be_bytes();
189        }
190    }
191    SchemaId::new(id)
192}
193
194fn conversation_wire_id(conversation_id: &str) -> u64 {
195    fnv1a(conversation_id.as_bytes())
196}
197
198/// FNV-1a 64-bit hash, used only for deterministic wire-id derivation.
199fn fnv1a(bytes: &[u8]) -> u64 {
200    const OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325;
201    const PRIME: u64 = 0x0000_0100_0000_01b3;
202    let mut hash = OFFSET_BASIS;
203    for byte in bytes {
204        hash ^= u64::from(*byte);
205        hash = hash.wrapping_mul(PRIME);
206    }
207    hash
208}
209
210/// Builds the wire `Publish` frame, attaching the idempotency key (and its flag)
211/// only when the request carries one so a no-key publish stays byte-identical to
212/// the pre-13-L1 layout.
213fn build_publish_frame(request: &WirePublishRequest) -> Frame {
214    let envelope = build_envelope(request.schema().schema.as_ref(), request.payload());
215    let flags = match request.idempotency_key() {
216        Some(_) => PUBLISH_IDEMPOTENCY_KEY_FLAG,
217        None => 0,
218    };
219    Frame::Publish {
220        flags,
221        stream_id: APPLICATION_STREAM_ID,
222        channel: request.channel().to_string(),
223        envelope,
224        idempotency_key: request.idempotency_key().map(ToString::to_string),
225    }
226}
227
228fn publish_response(frame: Frame) -> Result<PressureResponse, SdkError> {
229    match frame {
230        Frame::PublishAck { .. } => Ok(PressureResponse::Accept),
231        Frame::PublishError {
232            reason_code,
233            message,
234            ..
235        } => Err(SdkError::Backpressure {
236            reason: format!(
237                "server rejected publish (reason {reason_code}): {}",
238                message.unwrap_or_else(|| "no detail".to_string())
239            ),
240        }),
241        other => Err(unexpected_frame("PublishAck", &other)),
242    }
243}
244
245/// Maps a publish ack into a genuine delivery ack: the `PUBLISH_DELIVERED_FLAG`
246/// bit on the ack reports whether a subscriber actually received the message.
247fn publish_delivery_response(frame: Frame) -> Result<DeliveryAck, SdkError> {
248    match frame {
249        Frame::PublishAck { flags, .. } => {
250            let accepted = flags & PUBLISH_DELIVERED_FLAG != 0;
251            Ok(DeliveryAck::new(PressureResponse::Accept, accepted))
252        }
253        Frame::PublishError {
254            reason_code,
255            message,
256            ..
257        } => Err(SdkError::Backpressure {
258            reason: format!(
259                "server rejected publish (reason {reason_code}): {}",
260                message.unwrap_or_else(|| "no detail".to_string())
261            ),
262        }),
263        other => Err(unexpected_frame("PublishAck", &other)),
264    }
265}
266
267fn subscribe_response(frame: Frame) -> Result<(), SdkError> {
268    match frame {
269        Frame::SubscribeAck { .. } => Ok(()),
270        Frame::SubscribeError {
271            reason_code,
272            message,
273            ..
274        } => Err(SdkError::Protocol {
275            description: format!(
276                "server rejected subscribe (reason {reason_code}): {}",
277                message.unwrap_or_else(|| "no detail".to_string())
278            ),
279        }),
280        other => Err(unexpected_frame("SubscribeAck", &other)),
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn schema_ids_are_deterministic_and_distinct() {
290        assert_eq!(schema_id_from_bytes(b"a"), schema_id_from_bytes(b"a"));
291        assert_ne!(schema_id_from_bytes(b"a"), schema_id_from_bytes(b"b"));
292    }
293
294    #[test]
295    fn conversation_ids_are_stable() {
296        assert_eq!(conversation_wire_id("chat"), conversation_wire_id("chat"));
297        assert_ne!(conversation_wire_id("chat"), conversation_wire_id("other"));
298    }
299
300    #[test]
301    fn publish_ack_maps_to_accept() -> Result<(), SdkError> {
302        let frame = Frame::PublishAck {
303            flags: 0,
304            stream_id: 1,
305            message_id: 7,
306        };
307        assert_eq!(publish_response(frame)?, PressureResponse::Accept);
308        Ok(())
309    }
310
311    #[test]
312    fn publish_error_maps_to_backpressure() {
313        let frame = Frame::PublishError {
314            flags: 0,
315            stream_id: 1,
316            reason_code: 9,
317            message: Some("nope".to_string()),
318        };
319        assert!(matches!(
320            publish_response(frame),
321            Err(SdkError::Backpressure { .. })
322        ));
323    }
324}