liminal_sdk/remote/tcp/push_client.rs
1//! Client-side background reader for server-initiated pushes.
2//!
3//! Every other SDK transport call is request/response: the client writes a frame
4//! and reads exactly one reply to its own request ([`Connection::round_trip`]). A
5//! server PUSH inverts that — the server writes a [`Frame::Push`] on the client's
6//! existing connection at a time of the server's choosing, with no outstanding
7//! client request to read it. [`PushClient`] is the piece that consumes those
8//! inbound frames: it owns a connection whose socket is drained by a dedicated
9//! background reader thread, surfaces each pushed frame on a channel, and lets the
10//! caller send back a correlated [`Frame::PushReply`] on the same socket.
11//!
12//! # Read/write split
13//!
14//! A push connection is read concurrently (the background thread blocks on the
15//! socket) and written concurrently (the caller replies). `TcpStream` is cloned so
16//! the reader thread owns one handle and the writer holds the other behind a
17//! `Mutex`; the two handles share the same underlying socket, so a reply written
18//! by the caller travels the connection the server is pushing on. This keeps the
19//! request/reply [`Connection`] (which couples a single read to a single write)
20//! completely untouched — the push path is additive, not a rewrite.
21
22use alloc::format;
23use alloc::string::ToString;
24use alloc::sync::Arc;
25use alloc::vec;
26use alloc::vec::Vec;
27use core::time::Duration;
28
29use std::io::{Read, Write};
30use std::net::TcpStream;
31use std::sync::Mutex;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender, channel};
34use std::thread::JoinHandle;
35
36use liminal::protocol::{
37 CausalContext, Frame, MessageEnvelope, ProtocolError, ProtocolVersion, SchemaId,
38 WorkerRegisterOutcome, WorkerRegistration, decode, encode, encoded_len,
39};
40
41use crate::SdkError;
42
43/// Minimum protocol version this client advertises during the handshake.
44const CLIENT_MIN_VERSION: ProtocolVersion = ProtocolVersion::new(1, 0);
45/// Maximum protocol version this client advertises during the handshake.
46const CLIENT_MAX_VERSION: ProtocolVersion = ProtocolVersion::new(1, 0);
47/// Bound on a single socket write.
48const WRITE_TIMEOUT: Duration = Duration::from_secs(5);
49/// Poll cadence the reader thread uses so it can observe the stop flag promptly
50/// between reads while still blocking efficiently on the socket the rest of the
51/// time.
52const READER_POLL_TIMEOUT: Duration = Duration::from_millis(100);
53/// Read chunk size used when draining the socket into the frame buffer.
54const READ_CHUNK_BYTES: usize = 4096;
55/// Upper bound on a single buffered frame, guarding against runaway buffering.
56const MAX_FRAME_BYTES: usize = 16 * 1024 * 1024;
57/// Application stream id used for the client's push reply frames.
58const APPLICATION_STREAM_ID: u32 = 1;
59
60/// The reserved channel a worker publishes agent-observability events to over its
61/// existing push connection.
62///
63/// It is NOT a general pub/sub channel: the server routes a publish on this exact
64/// channel name straight to its `ConnectionNotifier` observability hook (bypassing
65/// the channel-fan-out cluster), so a worker never needs a second connection to
66/// stream a transcript. The name is a wire contract shared by the worker publisher
67/// and the server's demux, so it is pinned here as the single source of truth.
68pub const OBSERVABILITY_CHANNEL: &str = "aion.observability.v1";
69
70/// A frame the server pushed to this client.
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct PushedFrame {
73 /// Correlation id the server assigned; echo it on the reply.
74 correlation_id: u64,
75 /// Opaque payload bytes the server pushed.
76 payload: Vec<u8>,
77}
78
79impl PushedFrame {
80 /// Correlation id to echo back on the reply so the server matches it.
81 #[must_use]
82 pub const fn correlation_id(&self) -> u64 {
83 self.correlation_id
84 }
85
86 /// Opaque payload bytes the server pushed.
87 #[must_use]
88 pub fn payload(&self) -> &[u8] {
89 &self.payload
90 }
91
92 /// Consumes the frame, returning the owned payload bytes.
93 #[must_use]
94 pub fn into_payload(self) -> Vec<u8> {
95 self.payload
96 }
97}
98
99/// A connected client that consumes server pushes and sends correlated replies.
100///
101/// Construct with [`PushClient::connect`]; the background reader starts
102/// immediately and runs until the client is dropped. Pull pushed frames with
103/// [`PushClient::recv_timeout`] and answer them with [`PushClient::reply`].
104#[derive(Debug)]
105pub struct PushClient {
106 /// Write half of the shared socket, guarded so the caller's reply does not
107 /// interleave bytes with any other writer.
108 writer: Arc<Mutex<TcpStream>>,
109 /// Inbound pushed frames surfaced by the background reader.
110 inbound: Receiver<PushedFrame>,
111 /// Signals the reader thread to stop; set on drop.
112 stop: Arc<AtomicBool>,
113 /// Background reader handle, joined on drop.
114 reader: Option<JoinHandle<()>>,
115}
116
117impl PushClient {
118 /// Connects to `address`, performs the protocol handshake, and starts the
119 /// background reader that drains inbound server pushes.
120 ///
121 /// # Errors
122 ///
123 /// Returns [`SdkError::Connection`] when the TCP connection or socket
124 /// configuration fails, and [`SdkError::Protocol`] when the handshake is
125 /// rejected or the socket cannot be cloned for the reader thread.
126 pub fn connect(address: &str) -> Result<Self, SdkError> {
127 let mut stream = connect_socket(address)?;
128 handshake(&mut stream)?;
129 Self::start_reader(stream)
130 }
131
132 /// Connects, performs the handshake, then synchronously registers this client
133 /// as a worker before starting the background reader.
134 ///
135 /// This mirrors the synchronous `Connect`/`ConnectAck` pattern: the
136 /// `WorkerRegister` frame is written and its [`Frame::WorkerRegisterAck`] read
137 /// on the calling thread, BEFORE the Push-only background reader is spawned, so
138 /// the ack is never swallowed by the reader. A connect-variant (rather than a
139 /// `register()` method on a connected client) is the cleanest fit: `connect`
140 /// spawns the reader as its last step, so registration must be threaded into
141 /// the connect sequence to land before that spawn; a post-connect method would
142 /// race the already-running reader for the ack frame.
143 ///
144 /// # Errors
145 ///
146 /// Returns [`SdkError::Connection`] when the TCP connection or socket
147 /// configuration fails, and [`SdkError::Protocol`] when the handshake is
148 /// rejected, the server rejects the registration (the rejection reason is
149 /// carried in the error), or the socket cannot be cloned for the reader thread.
150 pub fn connect_with_registration(
151 address: &str,
152 registration: WorkerRegistration,
153 ) -> Result<Self, SdkError> {
154 let mut stream = connect_socket(address)?;
155 handshake(&mut stream)?;
156 register(&mut stream, registration)?;
157 Self::start_reader(stream)
158 }
159
160 /// Spawns the Push-only background reader over a handshaken (and, for a worker,
161 /// already-registered) stream and returns the running client.
162 fn start_reader(stream: TcpStream) -> Result<Self, SdkError> {
163 // Clone the socket so the reader thread owns one handle and the writer
164 // holds the other; both refer to the same underlying connection.
165 let read_stream = stream.try_clone().map_err(|source| SdkError::Protocol {
166 description: format!("failed to clone push socket for reader thread: {source}"),
167 })?;
168
169 let stop = Arc::new(AtomicBool::new(false));
170 let (sender, inbound) = channel();
171 let reader_stop = Arc::clone(&stop);
172 let reader = std::thread::Builder::new()
173 .name("liminal-push-reader".to_string())
174 .spawn(move || run_reader(read_stream, &sender, &reader_stop))
175 .map_err(|source| SdkError::Protocol {
176 description: format!("failed to start push reader thread: {source}"),
177 })?;
178
179 Ok(Self {
180 writer: Arc::new(Mutex::new(stream)),
181 inbound,
182 stop,
183 reader: Some(reader),
184 })
185 }
186
187 /// Blocks up to `timeout` for the next pushed frame from the server.
188 ///
189 /// # Errors
190 ///
191 /// Returns [`SdkError::Connection`] when no push arrives within `timeout` or
192 /// the background reader has stopped (e.g. the server closed the connection).
193 pub fn recv_timeout(&self, timeout: Duration) -> Result<PushedFrame, SdkError> {
194 self.inbound.recv_timeout(timeout).map_err(|error| {
195 let detail = match error {
196 RecvTimeoutError::Timeout => "no server push arrived within the timeout",
197 RecvTimeoutError::Disconnected => {
198 "the push reader stopped before a server push arrived"
199 }
200 };
201 SdkError::Connection {
202 description: format!("push receive failed: {detail}"),
203 }
204 })
205 }
206
207 /// Sends a correlated reply to a pushed frame, echoing its correlation id so
208 /// the server matches the reply back to the originating push.
209 ///
210 /// # Errors
211 ///
212 /// Returns [`SdkError::Protocol`] when the reply frame cannot be encoded and
213 /// [`SdkError::Connection`] when it cannot be written to the socket or the
214 /// writer lock is poisoned.
215 pub fn reply(&self, correlation_id: u64, payload: Vec<u8>) -> Result<(), SdkError> {
216 let frame = Frame::new_push_reply(APPLICATION_STREAM_ID, correlation_id, payload)
217 .map_err(|error| protocol_error(&error))?;
218 let mut writer = self.writer.lock().map_err(|error| SdkError::Connection {
219 description: format!("push writer lock poisoned: {error}"),
220 })?;
221 write_frame(&mut writer, &frame)
222 }
223
224 /// A cheap, cloneable handle to this push connection's write half, for
225 /// background tasks that publish out-of-band frames on the same socket without
226 /// owning the full client (which cannot be cloned — it holds the reader thread
227 /// join handle).
228 ///
229 /// The returned [`PushWriter`] shares the client's `Arc<Mutex<TcpStream>>`, so a
230 /// frame it writes travels the SAME connection the server pushes on. It is the
231 /// worker's observability-drain leg: a drain task holds one and publishes each
232 /// [`OBSERVABILITY_CHANNEL`] event live while the client keeps serving pushes.
233 #[must_use]
234 pub fn writer_handle(&self) -> PushWriter {
235 PushWriter {
236 writer: Arc::clone(&self.writer),
237 }
238 }
239
240 /// Publish `payload` to `channel` over this connection (out-of-band from the
241 /// push/reply round trip).
242 ///
243 /// Convenience shorthand for `self.writer_handle().publish(channel, payload)`.
244 ///
245 /// # Errors
246 ///
247 /// Returns [`SdkError::Protocol`] when the publish frame cannot be encoded and
248 /// [`SdkError::Connection`] when it cannot be written to the socket or the
249 /// writer lock is poisoned.
250 pub fn publish(&self, channel: &str, payload: Vec<u8>) -> Result<(), SdkError> {
251 self.writer_handle().publish(channel, payload)
252 }
253}
254
255/// A cheap clone of a [`PushClient`]'s write half.
256///
257/// It writes `Frame::Publish` frames on the SAME socket the client receives pushes
258/// on, so a background drain task can stream observability events upstream without a
259/// second connection. Cloning is an `Arc` bump; the underlying socket and its write
260/// lock are shared with the originating [`PushClient`].
261#[derive(Clone, Debug)]
262pub struct PushWriter {
263 writer: Arc<Mutex<TcpStream>>,
264}
265
266impl PushWriter {
267 /// Publish `payload` to `channel` on the shared connection.
268 ///
269 /// Writes a single `Frame::Publish` carrying the opaque bytes verbatim (schema
270 /// id zero, an independent causal context — the server routes the reserved
271 /// observability channel straight to its notifier hook, so no schema negotiation
272 /// or ordering context is required). The write takes the shared writer lock, so
273 /// it never interleaves bytes with a concurrent push reply.
274 ///
275 /// # Errors
276 ///
277 /// Returns [`SdkError::Protocol`] when the publish frame cannot be encoded and
278 /// [`SdkError::Connection`] when it cannot be written to the socket or the writer
279 /// lock is poisoned.
280 pub fn publish(&self, channel: &str, payload: Vec<u8>) -> Result<(), SdkError> {
281 let envelope = MessageEnvelope::new(
282 SchemaId::new([0_u8; SchemaId::WIRE_LEN]),
283 CausalContext::independent(),
284 payload,
285 );
286 let frame = Frame::new_publish(APPLICATION_STREAM_ID, channel, envelope)
287 .map_err(|error| protocol_error(&error))?;
288 let mut writer = self.writer.lock().map_err(|error| SdkError::Connection {
289 description: format!("push writer lock poisoned: {error}"),
290 })?;
291 write_frame(&mut writer, &frame)
292 }
293
294 /// Send a correlated reply to a server push on the shared connection, echoing the
295 /// push's `correlation_id` so the server matches the reply to its push.
296 ///
297 /// Identical wire effect to [`PushClient::reply`], but issued from a cheap
298 /// [`PushWriter`] clone so a BACKGROUND task (e.g. a long-running agent dispatch)
299 /// can answer its own push after it completes, without holding the full client or
300 /// blocking the serve loop. Shares the writer lock, so it never interleaves bytes
301 /// with a concurrent publish or reply.
302 ///
303 /// # Errors
304 ///
305 /// Returns [`SdkError::Protocol`] when the reply frame cannot be encoded and
306 /// [`SdkError::Connection`] when it cannot be written to the socket or the writer
307 /// lock is poisoned.
308 pub fn reply(&self, correlation_id: u64, payload: Vec<u8>) -> Result<(), SdkError> {
309 let frame = Frame::new_push_reply(APPLICATION_STREAM_ID, correlation_id, payload)
310 .map_err(|error| protocol_error(&error))?;
311 let mut writer = self.writer.lock().map_err(|error| SdkError::Connection {
312 description: format!("push writer lock poisoned: {error}"),
313 })?;
314 write_frame(&mut writer, &frame)
315 }
316}
317
318impl Drop for PushClient {
319 fn drop(&mut self) {
320 self.stop.store(true, Ordering::SeqCst);
321 if let Some(reader) = self.reader.take() {
322 // The reader wakes within READER_POLL_TIMEOUT to observe the stop flag,
323 // so this join does not hang on a quiet connection.
324 reader.join().ok();
325 }
326 }
327}
328
329/// Opens and configures the push-client socket (Nagle off, bounded read/write
330/// timeouts) before any framing.
331fn connect_socket(address: &str) -> Result<TcpStream, SdkError> {
332 let stream = TcpStream::connect(address).map_err(|source| SdkError::Connection {
333 description: format!("failed to connect push client to {address}: {source}"),
334 })?;
335 stream
336 .set_nodelay(true)
337 .map_err(|source| SdkError::Connection {
338 description: format!("failed to disable Nagle for {address}: {source}"),
339 })?;
340 // A bounded read timeout lets the reader thread wake to check the stop flag
341 // even when the server is silent; without it the thread would block forever
342 // on a quiet connection and never observe drop.
343 stream
344 .set_read_timeout(Some(READER_POLL_TIMEOUT))
345 .map_err(|source| SdkError::Connection {
346 description: format!("failed to set push read timeout for {address}: {source}"),
347 })?;
348 stream
349 .set_write_timeout(Some(WRITE_TIMEOUT))
350 .map_err(|source| SdkError::Connection {
351 description: format!("failed to set push write timeout for {address}: {source}"),
352 })?;
353 Ok(stream)
354}
355
356/// Drives the synchronous worker-registration round trip
357/// (`WorkerRegister` -> `WorkerRegisterAck`) on a handshaken socket, before the
358/// background reader is spawned.
359///
360/// A `Rejected` ack maps to a typed [`SdkError::Protocol`] carrying the server's
361/// reason; any non-ack reply is a protocol error.
362fn register(stream: &mut TcpStream, registration: WorkerRegistration) -> Result<(), SdkError> {
363 let frame = Frame::WorkerRegister {
364 flags: 0,
365 registration,
366 };
367 write_frame(stream, &frame)?;
368 let mut buffer = Vec::new();
369 match read_one_frame(stream, &mut buffer)? {
370 Frame::WorkerRegisterAck {
371 outcome: WorkerRegisterOutcome::Accepted,
372 ..
373 } => Ok(()),
374 Frame::WorkerRegisterAck {
375 outcome: WorkerRegisterOutcome::Rejected { reason },
376 ..
377 } => Err(SdkError::Protocol {
378 description: format!("server rejected worker registration: {reason}"),
379 }),
380 other => Err(SdkError::Protocol {
381 description: format!(
382 "expected WorkerRegisterAck during registration, received {:?}",
383 other.frame_type()
384 ),
385 }),
386 }
387}
388
389/// Drives the client handshake (`Connect` -> `ConnectAck`) on a fresh socket.
390fn handshake(stream: &mut TcpStream) -> Result<(), SdkError> {
391 let connect = Frame::Connect {
392 flags: 0,
393 min_version: CLIENT_MIN_VERSION,
394 max_version: CLIENT_MAX_VERSION,
395 auth_token: Vec::new(),
396 };
397 write_frame(stream, &connect)?;
398 let mut buffer = Vec::new();
399 match read_one_frame(stream, &mut buffer)? {
400 Frame::ConnectAck { .. } => Ok(()),
401 Frame::ConnectError {
402 reason_code,
403 message,
404 ..
405 } => Err(SdkError::Connection {
406 description: format!(
407 "server rejected push connection (reason {reason_code}): {}",
408 message.unwrap_or_else(|| "no detail".to_string())
409 ),
410 }),
411 other => Err(SdkError::Protocol {
412 description: format!(
413 "expected ConnectAck during push handshake, received {:?}",
414 other.frame_type()
415 ),
416 }),
417 }
418}
419
420/// Background loop: drains the socket, surfacing each `Push` frame on `sender`.
421///
422/// Returns (ending the thread) when the stop flag is set, the connection closes,
423/// or a fatal decode/IO error occurs. A read timeout is non-fatal: it just lets
424/// the loop re-check the stop flag.
425fn run_reader(mut stream: TcpStream, sender: &Sender<PushedFrame>, stop: &AtomicBool) {
426 let mut buffer = Vec::new();
427 while !stop.load(Ordering::SeqCst) {
428 match next_frame(&mut stream, &mut buffer) {
429 Ok(Some(Frame::Push {
430 correlation_id,
431 payload,
432 ..
433 })) => {
434 if sender
435 .send(PushedFrame {
436 correlation_id,
437 payload,
438 })
439 .is_err()
440 {
441 // The receiver was dropped; nothing will consume further
442 // pushes, so stop reading.
443 return;
444 }
445 }
446 // `Some(_)`: any non-Push frame on a push connection is unexpected for
447 // this spike — ignore it rather than tearing the reader down so a stray
448 // frame cannot silently drop subsequent pushes. `None`: a read timeout
449 // with no complete frame. Both just loop to re-check the stop flag.
450 Ok(Some(_) | None) => {}
451 // Connection closed or a fatal read/decode error: end the thread. The
452 // dropped `sender` surfaces as a `Disconnected` on the receiver side.
453 Err(_) => return,
454 }
455 }
456}
457
458/// Reads until one complete frame decodes, treating a read timeout as
459/// `Ok(None)` so the caller can re-check the stop flag without ending the loop.
460fn next_frame(stream: &mut TcpStream, buffer: &mut Vec<u8>) -> Result<Option<Frame>, SdkError> {
461 loop {
462 match decode(buffer) {
463 Ok((frame, consumed)) => {
464 buffer.drain(..consumed);
465 return Ok(Some(frame));
466 }
467 Err(
468 ProtocolError::IncompleteHeader { .. } | ProtocolError::TruncatedPayload { .. },
469 ) => match fill_buffer(stream, buffer)? {
470 FillOutcome::Read => {}
471 FillOutcome::TimedOut => return Ok(None),
472 },
473 Err(error) => return Err(protocol_error(&error)),
474 }
475 }
476}
477
478/// Reads one complete frame, blocking (no timeout tolerance) — used for the
479/// synchronous handshake and worker-registration replies, before the background
480/// reader starts.
481fn read_one_frame(stream: &mut TcpStream, buffer: &mut Vec<u8>) -> Result<Frame, SdkError> {
482 loop {
483 match decode(buffer) {
484 Ok((frame, consumed)) => {
485 buffer.drain(..consumed);
486 return Ok(frame);
487 }
488 Err(
489 ProtocolError::IncompleteHeader { .. } | ProtocolError::TruncatedPayload { .. },
490 ) => match fill_buffer(stream, buffer)? {
491 FillOutcome::Read => {}
492 FillOutcome::TimedOut => {
493 return Err(SdkError::Connection {
494 description: "push connection timed out waiting for a control-frame reply"
495 .to_string(),
496 });
497 }
498 },
499 Err(error) => return Err(protocol_error(&error)),
500 }
501 }
502}
503
504/// Appends one socket read into `buffer`, mapping a read timeout to a non-fatal
505/// [`FillOutcome::TimedOut`] so the reader can poll the stop flag.
506fn fill_buffer(stream: &mut TcpStream, buffer: &mut Vec<u8>) -> Result<FillOutcome, SdkError> {
507 if buffer.len() > MAX_FRAME_BYTES {
508 return Err(SdkError::Protocol {
509 description: format!(
510 "push frame exceeded {MAX_FRAME_BYTES} bytes without a complete frame"
511 ),
512 });
513 }
514 let mut chunk = [0_u8; READ_CHUNK_BYTES];
515 match stream.read(&mut chunk) {
516 Ok(0) => Err(SdkError::Connection {
517 description: "server closed the push connection".to_string(),
518 }),
519 Ok(read) => {
520 let Some(received) = chunk.get(..read) else {
521 return Err(SdkError::Protocol {
522 description: "push socket read reported more bytes than the buffer holds"
523 .to_string(),
524 });
525 };
526 buffer.extend_from_slice(received);
527 Ok(FillOutcome::Read)
528 }
529 Err(error)
530 if matches!(
531 error.kind(),
532 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
533 ) =>
534 {
535 Ok(FillOutcome::TimedOut)
536 }
537 Err(error) => Err(SdkError::Connection {
538 description: format!("failed to read from push connection: {error}"),
539 }),
540 }
541}
542
543/// Outcome of one non-fatal socket read attempt.
544#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545enum FillOutcome {
546 Read,
547 TimedOut,
548}
549
550/// Encodes and writes one frame to the socket, flushing it.
551fn write_frame(stream: &mut TcpStream, frame: &Frame) -> Result<(), SdkError> {
552 let len = encoded_len(frame).map_err(|error| protocol_error(&error))?;
553 let mut bytes = vec![0_u8; len];
554 let written = encode(frame, &mut bytes).map_err(|error| protocol_error(&error))?;
555 let encoded = bytes.get(..written).ok_or_else(|| SdkError::Protocol {
556 description: "push wire encoder reported an invalid byte count".to_string(),
557 })?;
558 stream
559 .write_all(encoded)
560 .map_err(|source| SdkError::Connection {
561 description: format!("failed to write push frame: {source}"),
562 })?;
563 stream.flush().map_err(|source| SdkError::Connection {
564 description: format!("failed to flush push frame: {source}"),
565 })
566}
567
568/// Maps a wire codec error into the SDK error taxonomy.
569fn protocol_error(error: &ProtocolError) -> SdkError {
570 SdkError::Protocol {
571 description: format!("push wire codec error: {error}"),
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578 use liminal::protocol::FrameType;
579
580 #[test]
581 fn pushed_frame_exposes_correlation_and_payload() {
582 let frame = PushedFrame {
583 correlation_id: 7,
584 payload: vec![1, 2, 3],
585 };
586 assert_eq!(frame.correlation_id(), 7);
587 assert_eq!(frame.payload(), &[1, 2, 3]);
588 assert_eq!(frame.into_payload(), vec![1, 2, 3]);
589 }
590
591 #[test]
592 fn publish_frame_round_trips_through_codec() -> Result<(), SdkError> {
593 // The observability publish frame the drain leg writes: a Publish on the
594 // reserved channel carrying opaque payload bytes verbatim.
595 let envelope = MessageEnvelope::new(
596 SchemaId::new([0_u8; SchemaId::WIRE_LEN]),
597 CausalContext::independent(),
598 vec![9, 9, 9],
599 );
600 let frame = Frame::new_publish(APPLICATION_STREAM_ID, OBSERVABILITY_CHANNEL, envelope)
601 .map_err(|error| protocol_error(&error))?;
602 let len = encoded_len(&frame).map_err(|error| protocol_error(&error))?;
603 let mut bytes = vec![0_u8; len];
604 let written = encode(&frame, &mut bytes).map_err(|error| protocol_error(&error))?;
605 let (decoded, consumed) =
606 decode(&bytes[..written]).map_err(|error| protocol_error(&error))?;
607 assert_eq!(consumed, written);
608 assert_eq!(decoded.frame_type(), FrameType::Publish);
609 let Frame::Publish {
610 channel, envelope, ..
611 } = decoded
612 else {
613 return Err(SdkError::Protocol {
614 description: "expected a Publish frame".to_string(),
615 });
616 };
617 assert_eq!(channel, OBSERVABILITY_CHANNEL);
618 assert_eq!(envelope.payload, vec![9, 9, 9]);
619 Ok(())
620 }
621
622 #[test]
623 fn reply_frame_round_trips_through_codec() -> Result<(), SdkError> {
624 let frame = Frame::new_push_reply(APPLICATION_STREAM_ID, 9, vec![4, 5])
625 .map_err(|error| protocol_error(&error))?;
626 let len = encoded_len(&frame).map_err(|error| protocol_error(&error))?;
627 let mut bytes = vec![0_u8; len];
628 let written = encode(&frame, &mut bytes).map_err(|error| protocol_error(&error))?;
629 let (decoded, consumed) =
630 decode(&bytes[..written]).map_err(|error| protocol_error(&error))?;
631 assert_eq!(consumed, written);
632 assert_eq!(decoded.frame_type(), FrameType::PushReply);
633 Ok(())
634 }
635}