running_process/broker/backend_sdk/frame_client.rs
1//! Blocking frame client with built-in request correlation (#412).
2//!
3//! Before #412, every consumer invented a per-connection request-id
4//! counter and re-implemented send/receive plumbing for the v1 frame
5//! wire. `FrameClient` owns both: it assigns monotonically increasing
6//! request ids, frames the payload, and validates that the response
7//! echoes the id and payload protocol.
8//!
9//! This client is **blocking**. The default `client` cargo feature
10//! carries no async runtime, so async daemons either wrap calls in
11//! their runtime's `spawn_blocking` or enable the `client-async`
12//! feature and use the async twin
13//! [`AsyncFrameClient`](super::AsyncFrameClient) instead (#414).
14//! Calling [`FrameClient::request`] from a tokio task without
15//! `spawn_blocking` will block the runtime worker thread.
16//!
17//! [`BackendHandle::probe_with_service`]: crate::broker::backend_handle::BackendHandle::probe_with_service
18
19use std::io;
20
21use prost::Message;
22
23use crate::broker::protocol::{read_frame, write_frame, Endpoint, Frame, FrameKind, FramingError};
24
25/// Blocking request/response client for a backend daemon's frame lane.
26///
27/// ```no_run
28/// use running_process::broker::backend_sdk::FrameClient;
29/// use running_process::broker::protocol::Endpoint;
30///
31/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
32/// let endpoint = Endpoint::unix_socket("my-daemon", "/tmp/my-daemon.sock")?;
33/// let mut client = FrameClient::connect(&endpoint)?;
34/// let response = client.request(0x7A63, b"ping".to_vec())?;
35/// assert_eq!(response.payload, b"pong");
36/// # Ok(())
37/// # }
38/// ```
39pub struct FrameClient {
40 stream: io::BufReader<interprocess::local_socket::Stream>,
41 next_request_id: u64,
42}
43
44impl FrameClient {
45 /// Connect to a backend endpoint using the platform local-socket
46 /// name type (bare pipe name on Windows, filesystem path on Unix).
47 pub fn connect(endpoint: &Endpoint) -> Result<Self, FrameClientError> {
48 let connection = crate::broker::backend_handle::Connection::connect(endpoint)
49 .map_err(FrameClientError::Connect)?;
50 Ok(Self::from_stream(connection.into_inner()))
51 }
52
53 /// Wrap an already-connected local-socket stream (e.g. one opened
54 /// through a verified
55 /// [`BackendHandle`](crate::broker::backend_handle::BackendHandle)).
56 pub fn from_stream(stream: interprocess::local_socket::Stream) -> Self {
57 Self {
58 stream: io::BufReader::new(stream),
59 next_request_id: 1,
60 }
61 }
62
63 /// Send one request frame and block until its response arrives.
64 ///
65 /// Assigns the next request id, sends
66 /// `Frame::request(payload_protocol, payload)`, then reads frames
67 /// until one echoes the id. The returned frame is validated to be a
68 /// `RESPONSE` on the same payload protocol.
69 pub fn request(
70 &mut self,
71 payload_protocol: u32,
72 payload: Vec<u8>,
73 ) -> Result<Frame, FrameClientError> {
74 let request_id = self.next_request_id;
75 self.next_request_id = self.next_request_id.wrapping_add(1).max(1);
76
77 let frame = Frame::request(payload_protocol, payload).with_request_id(request_id);
78 let mut body = Vec::with_capacity(frame.encoded_len());
79 frame
80 .encode(&mut body)
81 .expect("prost encoding into Vec cannot fail because Vec writes are infallible");
82 write_frame(self.stream.get_mut(), &body)?;
83
84 let response_bytes = read_frame(&mut self.stream)?;
85 let response =
86 Frame::decode(response_bytes.as_slice()).map_err(FrameClientError::Decode)?;
87 if response.request_id != request_id {
88 return Err(FrameClientError::RequestIdMismatch {
89 expected: request_id,
90 got: response.request_id,
91 });
92 }
93 if response.payload_protocol != payload_protocol {
94 return Err(FrameClientError::PayloadProtocolMismatch {
95 expected: payload_protocol,
96 got: response.payload_protocol,
97 });
98 }
99 if FrameKind::try_from(response.kind) != Ok(FrameKind::Response) {
100 return Err(FrameClientError::NotAResponse {
101 kind: response.kind,
102 });
103 }
104 Ok(response)
105 }
106
107 /// The request id the next [`Self::request`] call will use.
108 pub fn next_request_id(&self) -> u64 {
109 self.next_request_id
110 }
111
112 /// Bytes the internal frame reader has buffered but not yet consumed.
113 ///
114 /// Zero on a client that has issued no [`Self::request`]. A consumer that
115 /// wants to take the raw socket back out via [`Self::into_stream`] checks
116 /// this first: nonzero means there is response data the bare socket would
117 /// not carry, so the take must be refused.
118 pub fn buffered_len(&self) -> usize {
119 self.stream.buffer().len()
120 }
121
122 /// Consume the client and return the underlying local-socket stream.
123 ///
124 /// Hands the negotiated socket back to a consumer that will speak its own
125 /// wire over it (see [`BrokerSession::into_backend_io`]). Any bytes still
126 /// buffered by the frame reader are dropped, so callers must verify
127 /// [`Self::buffered_len`] is zero before calling — which it always is on a
128 /// freshly adopted session that has issued no request.
129 ///
130 /// [`BrokerSession::into_backend_io`]: crate::broker::adopt::BrokerSession::into_backend_io
131 pub fn into_stream(self) -> interprocess::local_socket::Stream {
132 self.stream.into_inner()
133 }
134}
135
136/// Errors returned by [`FrameClient`].
137#[derive(Debug, thiserror::Error)]
138pub enum FrameClientError {
139 /// Opening the IPC connection failed.
140 #[error("frame client connect failed: {0}")]
141 Connect(io::Error),
142 /// v1 framing failed on the wire.
143 #[error(transparent)]
144 Framing(#[from] FramingError),
145 /// The response body was not a valid prost `Frame`.
146 #[error("failed to decode response Frame: {0}")]
147 Decode(prost::DecodeError),
148 /// The response did not echo the request id.
149 #[error("response request_id {got} does not match request {expected}")]
150 RequestIdMismatch {
151 /// The id this client assigned to the request.
152 expected: u64,
153 /// The id the peer echoed.
154 got: u64,
155 },
156 /// The response switched payload protocols mid-correlation.
157 #[error("response payload_protocol {got:#06X} does not match request {expected:#06X}")]
158 PayloadProtocolMismatch {
159 /// The payload protocol the request used.
160 expected: u32,
161 /// The payload protocol the peer answered with.
162 got: u32,
163 },
164 /// The correlated frame was not a `RESPONSE`.
165 #[error("correlated frame kind {kind} is not RESPONSE")]
166 NotAResponse {
167 /// The raw frame kind received.
168 kind: i32,
169 },
170}