Skip to main content

running_process/broker/backend_sdk/
frame_client.rs

1//! Blocking frame client with built-in request correlation (#412).
2//!
3//! Before #412, every consumer invented a per-connection request-id
4//! counter and re-implemented send/receive plumbing for the v1 frame
5//! wire. `FrameClient` owns both: it assigns monotonically increasing
6//! request ids, frames the payload, and validates that the response
7//! echoes the id and payload protocol.
8//!
9//! This client is **blocking**. The default `client` cargo feature
10//! carries no async runtime, so async daemons either wrap calls in
11//! their runtime's `spawn_blocking` or enable the `client-async`
12//! feature and use the async twin
13//! [`AsyncFrameClient`](super::AsyncFrameClient) instead (#414).
14//! Calling [`FrameClient::request`] from a tokio task without
15//! `spawn_blocking` will block the runtime worker thread.
16//!
17//! [`BackendHandle::probe_with_service`]: crate::broker::backend_handle::BackendHandle::probe_with_service
18
19use std::io;
20
21use prost::Message;
22
23use crate::broker::protocol::{read_frame, write_frame, Endpoint, Frame, FrameKind, FramingError};
24
25/// Blocking request/response client for a backend daemon's frame lane.
26///
27/// ```no_run
28/// use running_process::broker::backend_sdk::FrameClient;
29/// use running_process::broker::protocol::Endpoint;
30///
31/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
32/// let endpoint = Endpoint::unix_socket("my-daemon", "/tmp/my-daemon.sock")?;
33/// let mut client = FrameClient::connect(&endpoint)?;
34/// let response = client.request(0x7A63, b"ping".to_vec())?;
35/// assert_eq!(response.payload, b"pong");
36/// # Ok(())
37/// # }
38/// ```
39pub struct FrameClient {
40    stream: io::BufReader<interprocess::local_socket::Stream>,
41    next_request_id: u64,
42}
43
44impl FrameClient {
45    /// Connect to a backend endpoint using the platform local-socket
46    /// name type (bare pipe name on Windows, filesystem path on Unix).
47    pub fn connect(endpoint: &Endpoint) -> Result<Self, FrameClientError> {
48        let connection = crate::broker::backend_handle::Connection::connect(endpoint)
49            .map_err(FrameClientError::Connect)?;
50        Ok(Self::from_stream(connection.into_inner()))
51    }
52
53    /// Wrap an already-connected local-socket stream (e.g. one opened
54    /// through a verified
55    /// [`BackendHandle`](crate::broker::backend_handle::BackendHandle)).
56    pub fn from_stream(stream: interprocess::local_socket::Stream) -> Self {
57        Self {
58            stream: io::BufReader::new(stream),
59            next_request_id: 1,
60        }
61    }
62
63    /// Send one request frame and block until its response arrives.
64    ///
65    /// Assigns the next request id, sends
66    /// `Frame::request(payload_protocol, payload)`, then reads frames
67    /// until one echoes the id. The returned frame is validated to be a
68    /// `RESPONSE` on the same payload protocol.
69    pub fn request(
70        &mut self,
71        payload_protocol: u32,
72        payload: Vec<u8>,
73    ) -> Result<Frame, FrameClientError> {
74        let request_id = self.next_request_id;
75        self.next_request_id = self.next_request_id.wrapping_add(1).max(1);
76
77        let frame = Frame::request(payload_protocol, payload).with_request_id(request_id);
78        let mut body = Vec::with_capacity(frame.encoded_len());
79        frame
80            .encode(&mut body)
81            .expect("prost encoding into Vec cannot fail because Vec writes are infallible");
82        write_frame(self.stream.get_mut(), &body)?;
83
84        let response_bytes = read_frame(&mut self.stream)?;
85        let response =
86            Frame::decode(response_bytes.as_slice()).map_err(FrameClientError::Decode)?;
87        if response.request_id != request_id {
88            return Err(FrameClientError::RequestIdMismatch {
89                expected: request_id,
90                got: response.request_id,
91            });
92        }
93        if response.payload_protocol != payload_protocol {
94            return Err(FrameClientError::PayloadProtocolMismatch {
95                expected: payload_protocol,
96                got: response.payload_protocol,
97            });
98        }
99        if FrameKind::try_from(response.kind) != Ok(FrameKind::Response) {
100            return Err(FrameClientError::NotAResponse {
101                kind: response.kind,
102            });
103        }
104        Ok(response)
105    }
106
107    /// The request id the next [`Self::request`] call will use.
108    pub fn next_request_id(&self) -> u64 {
109        self.next_request_id
110    }
111
112    /// Bytes the internal frame reader has buffered but not yet consumed.
113    ///
114    /// Zero on a client that has issued no [`Self::request`]. A consumer that
115    /// wants to take the raw socket back out via [`Self::into_stream`] checks
116    /// this first: nonzero means there is response data the bare socket would
117    /// not carry, so the take must be refused.
118    pub fn buffered_len(&self) -> usize {
119        self.stream.buffer().len()
120    }
121
122    /// Consume the client and return the underlying local-socket stream.
123    ///
124    /// Hands the negotiated socket back to a consumer that will speak its own
125    /// wire over it (see [`BrokerSession::into_backend_io`]). Any bytes still
126    /// buffered by the frame reader are dropped, so callers must verify
127    /// [`Self::buffered_len`] is zero before calling — which it always is on a
128    /// freshly adopted session that has issued no request.
129    ///
130    /// [`BrokerSession::into_backend_io`]: crate::broker::adopt::BrokerSession::into_backend_io
131    pub fn into_stream(self) -> interprocess::local_socket::Stream {
132        self.stream.into_inner()
133    }
134}
135
136/// Errors returned by [`FrameClient`].
137#[derive(Debug, thiserror::Error)]
138pub enum FrameClientError {
139    /// Opening the IPC connection failed.
140    #[error("frame client connect failed: {0}")]
141    Connect(io::Error),
142    /// v1 framing failed on the wire.
143    #[error(transparent)]
144    Framing(#[from] FramingError),
145    /// The response body was not a valid prost `Frame`.
146    #[error("failed to decode response Frame: {0}")]
147    Decode(prost::DecodeError),
148    /// The response did not echo the request id.
149    #[error("response request_id {got} does not match request {expected}")]
150    RequestIdMismatch {
151        /// The id this client assigned to the request.
152        expected: u64,
153        /// The id the peer echoed.
154        got: u64,
155    },
156    /// The response switched payload protocols mid-correlation.
157    #[error("response payload_protocol {got:#06X} does not match request {expected:#06X}")]
158    PayloadProtocolMismatch {
159        /// The payload protocol the request used.
160        expected: u32,
161        /// The payload protocol the peer answered with.
162        got: u32,
163    },
164    /// The correlated frame was not a `RESPONSE`.
165    #[error("correlated frame kind {kind} is not RESPONSE")]
166    NotAResponse {
167        /// The raw frame kind received.
168        kind: i32,
169    },
170}