microsandbox_agent_client/transport.rs
1//! Transport packet abstraction for agent protocol frames.
2//!
3//! The transport layer is intentionally CBOR-blind. It moves complete
4//! length-prefixed packets and leaves message-type validation to higher layers.
5
6use std::future::Future;
7use std::pin::Pin;
8
9use microsandbox_protocol::codec::{self, RawFrame};
10use tokio::io::{AsyncRead, AsyncWrite};
11
12use crate::error::{AgentClientError, AgentClientResult};
13
14//--------------------------------------------------------------------------------------------------
15// Types
16//--------------------------------------------------------------------------------------------------
17
18/// Exact bytes sent over an agent transport.
19///
20/// A packet contains the four-byte length prefix followed by one binary frame:
21/// `[len: u32 BE][id: u32 BE][flags: u8][body...]`.
22#[derive(Debug, Clone)]
23pub struct TransportPacket {
24 bytes: Vec<u8>,
25}
26
27/// Bidirectional packet transport for the agent protocol.
28///
29/// Custom transports can implement this trait when they can preserve exact
30/// packet boundaries. Byte-stream transports may use
31/// [`read_packet_from_io`] and [`write_packet_to_io`].
32pub trait AgentTransport: Send + Unpin + 'static {
33 /// Read the next packet. Returns `None` when the transport reaches EOF.
34 fn read_packet(
35 &mut self,
36 ) -> Pin<Box<dyn Future<Output = AgentClientResult<Option<TransportPacket>>> + Send + '_>>;
37
38 /// Write one packet to the transport.
39 fn write_packet(
40 &mut self,
41 packet: TransportPacket,
42 ) -> Pin<Box<dyn Future<Output = AgentClientResult<()>> + Send + '_>>;
43}
44
45//--------------------------------------------------------------------------------------------------
46// Methods
47//--------------------------------------------------------------------------------------------------
48
49impl TransportPacket {
50 /// Validate and wrap exact wire bytes.
51 ///
52 /// The input must contain exactly one complete transport packet. It may be
53 /// used by unchecked write paths, but it is still structurally validated so
54 /// callers cannot accidentally concatenate packets or pass a truncated
55 /// frame.
56 pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> AgentClientResult<Self> {
57 let bytes = bytes.into();
58 let mut buf = bytes.clone();
59 let Some(_frame) = codec::try_decode_raw_from_buf(&mut buf)? else {
60 return Err(AgentClientError::InvalidPacket(
61 "packet does not contain a complete frame".to_string(),
62 ));
63 };
64 if !buf.is_empty() {
65 return Err(AgentClientError::InvalidPacket(
66 "packet contains trailing bytes".to_string(),
67 ));
68 }
69 Ok(Self { bytes })
70 }
71
72 /// Create a packet from a structured raw frame.
73 ///
74 /// The frame body is left opaque; this method only applies the binary
75 /// transport framing.
76 pub fn from_frame(frame: &RawFrame) -> AgentClientResult<Self> {
77 let mut bytes = Vec::new();
78 codec::encode_raw_to_buf(frame, &mut bytes)?;
79 Ok(Self { bytes })
80 }
81
82 /// Borrow the exact transport bytes.
83 pub fn as_bytes(&self) -> &[u8] {
84 &self.bytes
85 }
86
87 /// Consume the packet and return its exact transport bytes.
88 pub fn into_bytes(self) -> Vec<u8> {
89 self.bytes
90 }
91}
92
93//--------------------------------------------------------------------------------------------------
94// Functions
95//--------------------------------------------------------------------------------------------------
96
97/// Read one length-prefixed packet from a byte stream.
98///
99/// Returns `Ok(None)` on clean EOF before a new packet begins.
100pub async fn read_packet_from_io<R>(reader: &mut R) -> AgentClientResult<Option<TransportPacket>>
101where
102 R: AsyncRead + Unpin,
103{
104 match codec::read_raw_frame(reader).await {
105 Ok(frame) => TransportPacket::from_frame(&frame).map(Some),
106 Err(microsandbox_protocol::ProtocolError::UnexpectedEof) => Ok(None),
107 Err(error) => Err(error.into()),
108 }
109}
110
111/// Write one packet to a byte stream.
112pub async fn write_packet_to_io<W>(writer: &mut W, packet: TransportPacket) -> AgentClientResult<()>
113where
114 W: AsyncWrite + Unpin,
115{
116 use tokio::io::AsyncWriteExt;
117
118 writer.write_all(packet.as_bytes()).await?;
119 writer.flush().await?;
120 Ok(())
121}