Skip to main content

liminal_sdk/remote/tcp/
push_client.rs

1//! Client-side background reader for server-initiated pushes.
2//!
3//! Every other SDK transport call is request/response: the client writes a frame
4//! and reads exactly one reply to its own request ([`Connection::round_trip`]). A
5//! server PUSH inverts that — the server writes a [`Frame::Push`] on the client's
6//! existing connection at a time of the server's choosing, with no outstanding
7//! client request to read it. [`PushClient`] is the piece that consumes those
8//! inbound frames: it owns a connection whose socket is drained by a dedicated
9//! background reader thread, surfaces each pushed frame on a channel, and lets the
10//! caller send back a correlated [`Frame::PushReply`] on the same socket.
11//!
12//! # Read/write split
13//!
14//! A push connection is read concurrently (the background thread blocks on the
15//! socket) and written concurrently (the caller replies). `TcpStream` is cloned so
16//! the reader thread owns one handle and the writer holds the other behind a
17//! `Mutex`; the two handles share the same underlying socket, so a reply written
18//! by the caller travels the connection the server is pushing on. This keeps the
19//! request/reply [`Connection`] (which couples a single read to a single write)
20//! completely untouched — the push path is additive, not a rewrite.
21
22use alloc::format;
23use alloc::string::ToString;
24use alloc::sync::Arc;
25use alloc::vec;
26use alloc::vec::Vec;
27use core::time::Duration;
28
29use std::io::{Read, Write};
30use std::net::TcpStream;
31use std::sync::Mutex;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender, channel};
34use std::thread::JoinHandle;
35
36use liminal::protocol::{
37    CausalContext, Frame, MessageEnvelope, ProtocolError, ProtocolVersion, SchemaId,
38    WorkerRegisterOutcome, WorkerRegistration, decode, encode, encoded_len,
39};
40
41use crate::SdkError;
42
43/// Minimum protocol version this client advertises during the handshake.
44const CLIENT_MIN_VERSION: ProtocolVersion = ProtocolVersion::new(1, 0);
45/// Maximum protocol version this client advertises during the handshake.
46const CLIENT_MAX_VERSION: ProtocolVersion = ProtocolVersion::new(1, 0);
47/// Bound on a single socket write.
48const WRITE_TIMEOUT: Duration = Duration::from_secs(5);
49/// Poll cadence the reader thread uses so it can observe the stop flag promptly
50/// between reads while still blocking efficiently on the socket the rest of the
51/// time.
52const READER_POLL_TIMEOUT: Duration = Duration::from_millis(100);
53/// Read chunk size used when draining the socket into the frame buffer.
54const READ_CHUNK_BYTES: usize = 4096;
55/// Upper bound on a single buffered frame, guarding against runaway buffering.
56const MAX_FRAME_BYTES: usize = 16 * 1024 * 1024;
57/// Application stream id used for the client's push reply frames.
58const APPLICATION_STREAM_ID: u32 = 1;
59
60/// The reserved channel a worker publishes agent-observability events to over its
61/// existing push connection.
62///
63/// It is NOT a general pub/sub channel: the server routes a publish on this exact
64/// channel name straight to its `ConnectionNotifier` observability hook (bypassing
65/// the channel-fan-out cluster), so a worker never needs a second connection to
66/// stream a transcript. The name is a wire contract shared by the worker publisher
67/// and the server's demux, so it is pinned here as the single source of truth.
68pub const OBSERVABILITY_CHANNEL: &str = "aion.observability.v1";
69
70/// A frame the server pushed to this client.
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct PushedFrame {
73    /// Correlation id the server assigned; echo it on the reply.
74    correlation_id: u64,
75    /// Opaque payload bytes the server pushed.
76    payload: Vec<u8>,
77}
78
79impl PushedFrame {
80    /// Correlation id to echo back on the reply so the server matches it.
81    #[must_use]
82    pub const fn correlation_id(&self) -> u64 {
83        self.correlation_id
84    }
85
86    /// Opaque payload bytes the server pushed.
87    #[must_use]
88    pub fn payload(&self) -> &[u8] {
89        &self.payload
90    }
91
92    /// Consumes the frame, returning the owned payload bytes.
93    #[must_use]
94    pub fn into_payload(self) -> Vec<u8> {
95        self.payload
96    }
97}
98
99/// A connected client that consumes server pushes and sends correlated replies.
100///
101/// Construct with [`PushClient::connect`]; the background reader starts
102/// immediately and runs until the client is dropped. Pull pushed frames with
103/// [`PushClient::recv_timeout`] and answer them with [`PushClient::reply`].
104#[derive(Debug)]
105pub struct PushClient {
106    /// Write half of the shared socket, guarded so the caller's reply does not
107    /// interleave bytes with any other writer.
108    writer: Arc<Mutex<TcpStream>>,
109    /// Inbound pushed frames surfaced by the background reader.
110    inbound: Receiver<PushedFrame>,
111    /// Signals the reader thread to stop; set on drop.
112    stop: Arc<AtomicBool>,
113    /// Background reader handle, joined on drop.
114    reader: Option<JoinHandle<()>>,
115}
116
117impl PushClient {
118    /// Connects to `address`, performs the protocol handshake, and starts the
119    /// background reader that drains inbound server pushes.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`SdkError::Connection`] when the TCP connection or socket
124    /// configuration fails, and [`SdkError::Protocol`] when the handshake is
125    /// rejected or the socket cannot be cloned for the reader thread.
126    pub fn connect(address: &str) -> Result<Self, SdkError> {
127        let mut stream = connect_socket(address)?;
128        handshake(&mut stream)?;
129        Self::start_reader(stream)
130    }
131
132    /// Connects, performs the handshake, then synchronously registers this client
133    /// as a worker before starting the background reader.
134    ///
135    /// This mirrors the synchronous `Connect`/`ConnectAck` pattern: the
136    /// `WorkerRegister` frame is written and its [`Frame::WorkerRegisterAck`] read
137    /// on the calling thread, BEFORE the Push-only background reader is spawned, so
138    /// the ack is never swallowed by the reader. A connect-variant (rather than a
139    /// `register()` method on a connected client) is the cleanest fit: `connect`
140    /// spawns the reader as its last step, so registration must be threaded into
141    /// the connect sequence to land before that spawn; a post-connect method would
142    /// race the already-running reader for the ack frame.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`SdkError::Connection`] when the TCP connection or socket
147    /// configuration fails, and [`SdkError::Protocol`] when the handshake is
148    /// rejected, the server rejects the registration (the rejection reason is
149    /// carried in the error), or the socket cannot be cloned for the reader thread.
150    pub fn connect_with_registration(
151        address: &str,
152        registration: WorkerRegistration,
153    ) -> Result<Self, SdkError> {
154        let mut stream = connect_socket(address)?;
155        handshake(&mut stream)?;
156        register(&mut stream, registration)?;
157        Self::start_reader(stream)
158    }
159
160    /// Spawns the Push-only background reader over a handshaken (and, for a worker,
161    /// already-registered) stream and returns the running client.
162    fn start_reader(stream: TcpStream) -> Result<Self, SdkError> {
163        // Clone the socket so the reader thread owns one handle and the writer
164        // holds the other; both refer to the same underlying connection.
165        let read_stream = stream.try_clone().map_err(|source| SdkError::Protocol {
166            description: format!("failed to clone push socket for reader thread: {source}"),
167        })?;
168
169        let stop = Arc::new(AtomicBool::new(false));
170        let (sender, inbound) = channel();
171        let reader_stop = Arc::clone(&stop);
172        let reader = std::thread::Builder::new()
173            .name("liminal-push-reader".to_string())
174            .spawn(move || run_reader(read_stream, &sender, &reader_stop))
175            .map_err(|source| SdkError::Protocol {
176                description: format!("failed to start push reader thread: {source}"),
177            })?;
178
179        Ok(Self {
180            writer: Arc::new(Mutex::new(stream)),
181            inbound,
182            stop,
183            reader: Some(reader),
184        })
185    }
186
187    /// Blocks up to `timeout` for the next pushed frame from the server.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`SdkError::Connection`] when no push arrives within `timeout` or
192    /// the background reader has stopped (e.g. the server closed the connection).
193    pub fn recv_timeout(&self, timeout: Duration) -> Result<PushedFrame, SdkError> {
194        self.inbound.recv_timeout(timeout).map_err(|error| {
195            let detail = match error {
196                RecvTimeoutError::Timeout => "no server push arrived within the timeout",
197                RecvTimeoutError::Disconnected => {
198                    "the push reader stopped before a server push arrived"
199                }
200            };
201            SdkError::Connection {
202                description: format!("push receive failed: {detail}"),
203            }
204        })
205    }
206
207    /// Sends a correlated reply to a pushed frame, echoing its correlation id so
208    /// the server matches the reply back to the originating push.
209    ///
210    /// # Errors
211    ///
212    /// Returns [`SdkError::Protocol`] when the reply frame cannot be encoded and
213    /// [`SdkError::Connection`] when it cannot be written to the socket or the
214    /// writer lock is poisoned.
215    pub fn reply(&self, correlation_id: u64, payload: Vec<u8>) -> Result<(), SdkError> {
216        let frame = Frame::new_push_reply(APPLICATION_STREAM_ID, correlation_id, payload)
217            .map_err(|error| protocol_error(&error))?;
218        let mut writer = self.writer.lock().map_err(|error| SdkError::Connection {
219            description: format!("push writer lock poisoned: {error}"),
220        })?;
221        write_frame(&mut writer, &frame)
222    }
223
224    /// A cheap, cloneable handle to this push connection's write half, for
225    /// background tasks that publish out-of-band frames on the same socket without
226    /// owning the full client (which cannot be cloned — it holds the reader thread
227    /// join handle).
228    ///
229    /// The returned [`PushWriter`] shares the client's `Arc<Mutex<TcpStream>>`, so a
230    /// frame it writes travels the SAME connection the server pushes on. It is the
231    /// worker's observability-drain leg: a drain task holds one and publishes each
232    /// [`OBSERVABILITY_CHANNEL`] event live while the client keeps serving pushes.
233    #[must_use]
234    pub fn writer_handle(&self) -> PushWriter {
235        PushWriter {
236            writer: Arc::clone(&self.writer),
237        }
238    }
239
240    /// Publish `payload` to `channel` over this connection (out-of-band from the
241    /// push/reply round trip).
242    ///
243    /// Convenience shorthand for `self.writer_handle().publish(channel, payload)`.
244    ///
245    /// # Errors
246    ///
247    /// Returns [`SdkError::Protocol`] when the publish frame cannot be encoded and
248    /// [`SdkError::Connection`] when it cannot be written to the socket or the
249    /// writer lock is poisoned.
250    pub fn publish(&self, channel: &str, payload: Vec<u8>) -> Result<(), SdkError> {
251        self.writer_handle().publish(channel, payload)
252    }
253}
254
255/// A cheap clone of a [`PushClient`]'s write half.
256///
257/// It writes `Frame::Publish` frames on the SAME socket the client receives pushes
258/// on, so a background drain task can stream observability events upstream without a
259/// second connection. Cloning is an `Arc` bump; the underlying socket and its write
260/// lock are shared with the originating [`PushClient`].
261#[derive(Clone, Debug)]
262pub struct PushWriter {
263    writer: Arc<Mutex<TcpStream>>,
264}
265
266impl PushWriter {
267    /// Publish `payload` to `channel` on the shared connection.
268    ///
269    /// Writes a single `Frame::Publish` carrying the opaque bytes verbatim (schema
270    /// id zero, an independent causal context — the server routes the reserved
271    /// observability channel straight to its notifier hook, so no schema negotiation
272    /// or ordering context is required). The write takes the shared writer lock, so
273    /// it never interleaves bytes with a concurrent push reply.
274    ///
275    /// # Errors
276    ///
277    /// Returns [`SdkError::Protocol`] when the publish frame cannot be encoded and
278    /// [`SdkError::Connection`] when it cannot be written to the socket or the writer
279    /// lock is poisoned.
280    pub fn publish(&self, channel: &str, payload: Vec<u8>) -> Result<(), SdkError> {
281        let envelope = MessageEnvelope::new(
282            SchemaId::new([0_u8; SchemaId::WIRE_LEN]),
283            CausalContext::independent(),
284            payload,
285        );
286        let frame = Frame::new_publish(APPLICATION_STREAM_ID, channel, envelope)
287            .map_err(|error| protocol_error(&error))?;
288        let mut writer = self.writer.lock().map_err(|error| SdkError::Connection {
289            description: format!("push writer lock poisoned: {error}"),
290        })?;
291        write_frame(&mut writer, &frame)
292    }
293
294    /// Send a correlated reply to a server push on the shared connection, echoing the
295    /// push's `correlation_id` so the server matches the reply to its push.
296    ///
297    /// Identical wire effect to [`PushClient::reply`], but issued from a cheap
298    /// [`PushWriter`] clone so a BACKGROUND task (e.g. a long-running agent dispatch)
299    /// can answer its own push after it completes, without holding the full client or
300    /// blocking the serve loop. Shares the writer lock, so it never interleaves bytes
301    /// with a concurrent publish or reply.
302    ///
303    /// # Errors
304    ///
305    /// Returns [`SdkError::Protocol`] when the reply frame cannot be encoded and
306    /// [`SdkError::Connection`] when it cannot be written to the socket or the writer
307    /// lock is poisoned.
308    pub fn reply(&self, correlation_id: u64, payload: Vec<u8>) -> Result<(), SdkError> {
309        let frame = Frame::new_push_reply(APPLICATION_STREAM_ID, correlation_id, payload)
310            .map_err(|error| protocol_error(&error))?;
311        let mut writer = self.writer.lock().map_err(|error| SdkError::Connection {
312            description: format!("push writer lock poisoned: {error}"),
313        })?;
314        write_frame(&mut writer, &frame)
315    }
316}
317
318impl Drop for PushClient {
319    fn drop(&mut self) {
320        self.stop.store(true, Ordering::SeqCst);
321        if let Some(reader) = self.reader.take() {
322            // The reader wakes within READER_POLL_TIMEOUT to observe the stop flag,
323            // so this join does not hang on a quiet connection.
324            reader.join().ok();
325        }
326    }
327}
328
329/// Opens and configures the push-client socket (Nagle off, bounded read/write
330/// timeouts) before any framing.
331fn connect_socket(address: &str) -> Result<TcpStream, SdkError> {
332    let stream = TcpStream::connect(address).map_err(|source| SdkError::Connection {
333        description: format!("failed to connect push client to {address}: {source}"),
334    })?;
335    stream
336        .set_nodelay(true)
337        .map_err(|source| SdkError::Connection {
338            description: format!("failed to disable Nagle for {address}: {source}"),
339        })?;
340    // A bounded read timeout lets the reader thread wake to check the stop flag
341    // even when the server is silent; without it the thread would block forever
342    // on a quiet connection and never observe drop.
343    stream
344        .set_read_timeout(Some(READER_POLL_TIMEOUT))
345        .map_err(|source| SdkError::Connection {
346            description: format!("failed to set push read timeout for {address}: {source}"),
347        })?;
348    stream
349        .set_write_timeout(Some(WRITE_TIMEOUT))
350        .map_err(|source| SdkError::Connection {
351            description: format!("failed to set push write timeout for {address}: {source}"),
352        })?;
353    Ok(stream)
354}
355
356/// Drives the synchronous worker-registration round trip
357/// (`WorkerRegister` -> `WorkerRegisterAck`) on a handshaken socket, before the
358/// background reader is spawned.
359///
360/// A `Rejected` ack maps to a typed [`SdkError::Protocol`] carrying the server's
361/// reason; any non-ack reply is a protocol error.
362fn register(stream: &mut TcpStream, registration: WorkerRegistration) -> Result<(), SdkError> {
363    let frame = Frame::WorkerRegister {
364        flags: 0,
365        registration,
366    };
367    write_frame(stream, &frame)?;
368    let mut buffer = Vec::new();
369    match read_one_frame(stream, &mut buffer)? {
370        Frame::WorkerRegisterAck {
371            outcome: WorkerRegisterOutcome::Accepted,
372            ..
373        } => Ok(()),
374        Frame::WorkerRegisterAck {
375            outcome: WorkerRegisterOutcome::Rejected { reason },
376            ..
377        } => Err(SdkError::Protocol {
378            description: format!("server rejected worker registration: {reason}"),
379        }),
380        other => Err(SdkError::Protocol {
381            description: format!(
382                "expected WorkerRegisterAck during registration, received {:?}",
383                other.frame_type()
384            ),
385        }),
386    }
387}
388
389/// Drives the client handshake (`Connect` -> `ConnectAck`) on a fresh socket.
390fn handshake(stream: &mut TcpStream) -> Result<(), SdkError> {
391    let connect = Frame::Connect {
392        flags: 0,
393        min_version: CLIENT_MIN_VERSION,
394        max_version: CLIENT_MAX_VERSION,
395        auth_token: Vec::new(),
396    };
397    write_frame(stream, &connect)?;
398    let mut buffer = Vec::new();
399    match read_one_frame(stream, &mut buffer)? {
400        Frame::ConnectAck { .. } => Ok(()),
401        Frame::ConnectError {
402            reason_code,
403            message,
404            ..
405        } => Err(SdkError::Connection {
406            description: format!(
407                "server rejected push connection (reason {reason_code}): {}",
408                message.unwrap_or_else(|| "no detail".to_string())
409            ),
410        }),
411        other => Err(SdkError::Protocol {
412            description: format!(
413                "expected ConnectAck during push handshake, received {:?}",
414                other.frame_type()
415            ),
416        }),
417    }
418}
419
420/// Background loop: drains the socket, surfacing each `Push` frame on `sender`.
421///
422/// Returns (ending the thread) when the stop flag is set, the connection closes,
423/// or a fatal decode/IO error occurs. A read timeout is non-fatal: it just lets
424/// the loop re-check the stop flag.
425fn run_reader(mut stream: TcpStream, sender: &Sender<PushedFrame>, stop: &AtomicBool) {
426    let mut buffer = Vec::new();
427    while !stop.load(Ordering::SeqCst) {
428        match next_frame(&mut stream, &mut buffer) {
429            Ok(Some(Frame::Push {
430                correlation_id,
431                payload,
432                ..
433            })) => {
434                if sender
435                    .send(PushedFrame {
436                        correlation_id,
437                        payload,
438                    })
439                    .is_err()
440                {
441                    // The receiver was dropped; nothing will consume further
442                    // pushes, so stop reading.
443                    return;
444                }
445            }
446            // `Some(_)`: any non-Push frame on a push connection is unexpected for
447            // this spike — ignore it rather than tearing the reader down so a stray
448            // frame cannot silently drop subsequent pushes. `None`: a read timeout
449            // with no complete frame. Both just loop to re-check the stop flag.
450            Ok(Some(_) | None) => {}
451            // Connection closed or a fatal read/decode error: end the thread. The
452            // dropped `sender` surfaces as a `Disconnected` on the receiver side.
453            Err(_) => return,
454        }
455    }
456}
457
458/// Reads until one complete frame decodes, treating a read timeout as
459/// `Ok(None)` so the caller can re-check the stop flag without ending the loop.
460fn next_frame(stream: &mut TcpStream, buffer: &mut Vec<u8>) -> Result<Option<Frame>, SdkError> {
461    loop {
462        match decode(buffer) {
463            Ok((frame, consumed)) => {
464                buffer.drain(..consumed);
465                return Ok(Some(frame));
466            }
467            Err(
468                ProtocolError::IncompleteHeader { .. } | ProtocolError::TruncatedPayload { .. },
469            ) => match fill_buffer(stream, buffer)? {
470                FillOutcome::Read => {}
471                FillOutcome::TimedOut => return Ok(None),
472            },
473            Err(error) => return Err(protocol_error(&error)),
474        }
475    }
476}
477
478/// Reads one complete frame, blocking (no timeout tolerance) — used for the
479/// synchronous handshake and worker-registration replies, before the background
480/// reader starts.
481fn read_one_frame(stream: &mut TcpStream, buffer: &mut Vec<u8>) -> Result<Frame, SdkError> {
482    loop {
483        match decode(buffer) {
484            Ok((frame, consumed)) => {
485                buffer.drain(..consumed);
486                return Ok(frame);
487            }
488            Err(
489                ProtocolError::IncompleteHeader { .. } | ProtocolError::TruncatedPayload { .. },
490            ) => match fill_buffer(stream, buffer)? {
491                FillOutcome::Read => {}
492                FillOutcome::TimedOut => {
493                    return Err(SdkError::Connection {
494                        description: "push connection timed out waiting for a control-frame reply"
495                            .to_string(),
496                    });
497                }
498            },
499            Err(error) => return Err(protocol_error(&error)),
500        }
501    }
502}
503
504/// Appends one socket read into `buffer`, mapping a read timeout to a non-fatal
505/// [`FillOutcome::TimedOut`] so the reader can poll the stop flag.
506fn fill_buffer(stream: &mut TcpStream, buffer: &mut Vec<u8>) -> Result<FillOutcome, SdkError> {
507    if buffer.len() > MAX_FRAME_BYTES {
508        return Err(SdkError::Protocol {
509            description: format!(
510                "push frame exceeded {MAX_FRAME_BYTES} bytes without a complete frame"
511            ),
512        });
513    }
514    let mut chunk = [0_u8; READ_CHUNK_BYTES];
515    match stream.read(&mut chunk) {
516        Ok(0) => Err(SdkError::Connection {
517            description: "server closed the push connection".to_string(),
518        }),
519        Ok(read) => {
520            let Some(received) = chunk.get(..read) else {
521                return Err(SdkError::Protocol {
522                    description: "push socket read reported more bytes than the buffer holds"
523                        .to_string(),
524                });
525            };
526            buffer.extend_from_slice(received);
527            Ok(FillOutcome::Read)
528        }
529        Err(error)
530            if matches!(
531                error.kind(),
532                std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
533            ) =>
534        {
535            Ok(FillOutcome::TimedOut)
536        }
537        Err(error) => Err(SdkError::Connection {
538            description: format!("failed to read from push connection: {error}"),
539        }),
540    }
541}
542
543/// Outcome of one non-fatal socket read attempt.
544#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545enum FillOutcome {
546    Read,
547    TimedOut,
548}
549
550/// Encodes and writes one frame to the socket, flushing it.
551fn write_frame(stream: &mut TcpStream, frame: &Frame) -> Result<(), SdkError> {
552    let len = encoded_len(frame).map_err(|error| protocol_error(&error))?;
553    let mut bytes = vec![0_u8; len];
554    let written = encode(frame, &mut bytes).map_err(|error| protocol_error(&error))?;
555    let encoded = bytes.get(..written).ok_or_else(|| SdkError::Protocol {
556        description: "push wire encoder reported an invalid byte count".to_string(),
557    })?;
558    stream
559        .write_all(encoded)
560        .map_err(|source| SdkError::Connection {
561            description: format!("failed to write push frame: {source}"),
562        })?;
563    stream.flush().map_err(|source| SdkError::Connection {
564        description: format!("failed to flush push frame: {source}"),
565    })
566}
567
568/// Maps a wire codec error into the SDK error taxonomy.
569fn protocol_error(error: &ProtocolError) -> SdkError {
570    SdkError::Protocol {
571        description: format!("push wire codec error: {error}"),
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use liminal::protocol::FrameType;
579
580    #[test]
581    fn pushed_frame_exposes_correlation_and_payload() {
582        let frame = PushedFrame {
583            correlation_id: 7,
584            payload: vec![1, 2, 3],
585        };
586        assert_eq!(frame.correlation_id(), 7);
587        assert_eq!(frame.payload(), &[1, 2, 3]);
588        assert_eq!(frame.into_payload(), vec![1, 2, 3]);
589    }
590
591    #[test]
592    fn publish_frame_round_trips_through_codec() -> Result<(), SdkError> {
593        // The observability publish frame the drain leg writes: a Publish on the
594        // reserved channel carrying opaque payload bytes verbatim.
595        let envelope = MessageEnvelope::new(
596            SchemaId::new([0_u8; SchemaId::WIRE_LEN]),
597            CausalContext::independent(),
598            vec![9, 9, 9],
599        );
600        let frame = Frame::new_publish(APPLICATION_STREAM_ID, OBSERVABILITY_CHANNEL, envelope)
601            .map_err(|error| protocol_error(&error))?;
602        let len = encoded_len(&frame).map_err(|error| protocol_error(&error))?;
603        let mut bytes = vec![0_u8; len];
604        let written = encode(&frame, &mut bytes).map_err(|error| protocol_error(&error))?;
605        let (decoded, consumed) =
606            decode(&bytes[..written]).map_err(|error| protocol_error(&error))?;
607        assert_eq!(consumed, written);
608        assert_eq!(decoded.frame_type(), FrameType::Publish);
609        let Frame::Publish {
610            channel, envelope, ..
611        } = decoded
612        else {
613            return Err(SdkError::Protocol {
614                description: "expected a Publish frame".to_string(),
615            });
616        };
617        assert_eq!(channel, OBSERVABILITY_CHANNEL);
618        assert_eq!(envelope.payload, vec![9, 9, 9]);
619        Ok(())
620    }
621
622    #[test]
623    fn reply_frame_round_trips_through_codec() -> Result<(), SdkError> {
624        let frame = Frame::new_push_reply(APPLICATION_STREAM_ID, 9, vec![4, 5])
625            .map_err(|error| protocol_error(&error))?;
626        let len = encoded_len(&frame).map_err(|error| protocol_error(&error))?;
627        let mut bytes = vec![0_u8; len];
628        let written = encode(&frame, &mut bytes).map_err(|error| protocol_error(&error))?;
629        let (decoded, consumed) =
630            decode(&bytes[..written]).map_err(|error| protocol_error(&error))?;
631        assert_eq!(consumed, written);
632        assert_eq!(decoded.frame_type(), FrameType::PushReply);
633        Ok(())
634    }
635}