zlayer_overlayd/client.rs
1//! Client the main `zlayer` daemon uses to drive overlayd over IPC.
2//!
3//! Holds one framed connection and issues request/response round-trips. The
4//! agent's `overlay_manager` shim wraps this (behind a `Mutex`) and maps its
5//! public methods to [`OverlaydRequest`]s.
6
7use std::path::{Path, PathBuf};
8use std::time::Duration;
9
10use zlayer_types::overlayd::{OverlaydFrame, OverlaydRequest, OverlaydResponse};
11
12use crate::error::{OverlaydError, Result};
13use crate::transport::{self, ClientConn};
14
15/// A connected overlayd client.
16pub struct OverlaydClient {
17 conn: ClientConn,
18 next_id: u64,
19 endpoint: PathBuf,
20}
21
22impl OverlaydClient {
23 /// Connect once to the overlayd endpoint.
24 ///
25 /// # Errors
26 /// Propagates the underlying connect error.
27 pub async fn connect(endpoint: &Path) -> Result<Self> {
28 let conn = transport::connect(endpoint).await?;
29 Ok(Self {
30 conn,
31 next_id: 1,
32 endpoint: endpoint.to_path_buf(),
33 })
34 }
35
36 /// Connect with exponential backoff (100ms → 1s, ~20 attempts) so the main
37 /// daemon can start before overlayd has finished binding its socket.
38 ///
39 /// Use this on the path that has *just spawned* overlayd and legitimately
40 /// needs to wait for it to bind (the supervisor). On hot paths where a dead
41 /// overlayd must degrade fast (overlay setup is non-fatal), prefer
42 /// [`Self::connect_with_attempts`] with a small budget so daemon startup is
43 /// not held hostage by an unreachable overlayd.
44 ///
45 /// # Errors
46 /// Returns the last connect error if every attempt fails.
47 pub async fn connect_with_backoff(endpoint: &Path) -> Result<Self> {
48 Self::connect_with_attempts(endpoint, 20).await
49 }
50
51 /// Connect with bounded exponential backoff (100ms → 1s) for at most
52 /// `max_attempts` attempts. Sleeps only *between* attempts (never after the
53 /// final one), so the worst-case wall time for a dead endpoint is bounded
54 /// and predictable: `max_attempts = 6` ≈ 100+200+400+800+1000 = ~2.5s.
55 ///
56 /// # Errors
57 /// Returns the last connect error if every attempt fails.
58 pub async fn connect_with_attempts(endpoint: &Path, max_attempts: u32) -> Result<Self> {
59 let attempts = max_attempts.max(1);
60 let mut delay = Duration::from_millis(100);
61 let max_delay = Duration::from_secs(1);
62 let mut last_err: Option<OverlaydError> = None;
63 for attempt in 0..attempts {
64 match transport::connect(endpoint).await {
65 Ok(conn) => {
66 return Ok(Self {
67 conn,
68 next_id: 1,
69 endpoint: endpoint.to_path_buf(),
70 });
71 }
72 Err(e) => {
73 last_err = Some(e);
74 // Don't sleep after the final attempt — it's wasted latency.
75 if attempt + 1 < attempts {
76 tokio::time::sleep(delay).await;
77 delay = std::cmp::min(delay * 2, max_delay);
78 }
79 }
80 }
81 }
82 Err(last_err.unwrap_or_else(|| OverlaydError::Other("overlayd connect failed".to_string())))
83 }
84
85 /// The endpoint this client is connected to.
86 #[must_use]
87 pub fn endpoint(&self) -> &Path {
88 &self.endpoint
89 }
90
91 /// Send a request and await its matching response. Event frames that arrive
92 /// while waiting are logged and skipped (a request/response client does not
93 /// subscribe to events).
94 ///
95 /// # Errors
96 /// Propagates transport errors. The returned [`OverlaydResponse`] may itself
97 /// be [`OverlaydResponse::Err`]; use [`Self::call`] to fold that into an
98 /// error.
99 pub async fn request(&mut self, request: OverlaydRequest) -> Result<OverlaydResponse> {
100 let id = self.next_id;
101 self.next_id = self.next_id.wrapping_add(1);
102 self.conn
103 .send(&OverlaydFrame::Request { id, request })
104 .await?;
105 loop {
106 match self.conn.recv().await? {
107 OverlaydFrame::Response { id: rid, response } if rid == id => return Ok(response),
108 OverlaydFrame::Response { id: rid, .. } => {
109 tracing::warn!(
110 expected = id,
111 got = rid,
112 "overlayd: out-of-order response id"
113 );
114 }
115 OverlaydFrame::Event(ev) => {
116 tracing::debug!(?ev, "overlayd: event with no subscriber; dropping");
117 }
118 OverlaydFrame::Request { .. } => {
119 return Err(OverlaydError::Other(
120 "overlayd sent a Request frame to a client".to_string(),
121 ));
122 }
123 }
124 }
125 }
126
127 /// Like [`Self::request`] but maps an `Err` response into
128 /// [`OverlaydError::Overlay`], so callers get a single `Result`.
129 ///
130 /// # Errors
131 /// Transport errors, or [`OverlaydError::Overlay`] if overlayd returned an
132 /// error response.
133 pub async fn call(&mut self, request: OverlaydRequest) -> Result<OverlaydResponse> {
134 match self.request(request).await? {
135 OverlaydResponse::Err { message } => Err(OverlaydError::Overlay(message)),
136 other => Ok(other),
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use std::time::Instant;
145
146 /// A single attempt against a dead endpoint must NOT sleep (the trailing-sleep
147 /// fix): it should fail effectively immediately, not after the first backoff.
148 #[tokio::test]
149 async fn connect_single_attempt_does_not_sleep() {
150 let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
151 let start = Instant::now();
152 let res = OverlaydClient::connect_with_attempts(dead, 1).await;
153 let elapsed = start.elapsed();
154 assert!(res.is_err(), "connect to a nonexistent socket must fail");
155 assert!(
156 elapsed < Duration::from_millis(80),
157 "1 attempt must not pay a backoff sleep (took {elapsed:?})"
158 );
159 }
160
161 /// `connect_with_attempts` is bounded: against a dead endpoint it returns an
162 /// error after roughly the sum of the inter-attempt backoffs, never hanging.
163 /// 4 attempts ⇒ sleeps of 100+200+400 ≈ 700ms (no sleep after the 4th).
164 #[tokio::test]
165 async fn connect_with_attempts_is_bounded() {
166 let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
167 let start = Instant::now();
168 let res = OverlaydClient::connect_with_attempts(dead, 4).await;
169 let elapsed = start.elapsed();
170 assert!(res.is_err());
171 // Lower bound proves it actually retried with backoff; upper bound proves
172 // it stayed bounded (and didn't sleep after the final attempt).
173 assert!(
174 elapsed >= Duration::from_millis(600),
175 "4 attempts should back off ~700ms (took {elapsed:?})"
176 );
177 assert!(
178 elapsed < Duration::from_millis(1500),
179 "4 attempts must stay bounded (took {elapsed:?})"
180 );
181 }
182
183 /// `max_attempts = 0` is clamped to 1 (one try, no sleep), never an infinite
184 /// or zero-attempt loop.
185 #[tokio::test]
186 async fn connect_zero_attempts_clamped_to_one() {
187 let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
188 let res = OverlaydClient::connect_with_attempts(dead, 0).await;
189 assert!(res.is_err());
190 }
191}