1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
//! Client the main `zlayer` daemon uses to drive overlayd over IPC.
//!
//! Holds one framed connection and issues request/response round-trips. The
//! agent's `overlay_manager` shim wraps this (behind a `Mutex`) and maps its
//! public methods to [`OverlaydRequest`]s.
use std::path::{Path, PathBuf};
use std::time::Duration;
use zlayer_types::overlayd::{OverlaydFrame, OverlaydRequest, OverlaydResponse};
use crate::error::{OverlaydError, Result};
use crate::transport::{self, ClientConn};
/// A connected overlayd client.
pub struct OverlaydClient {
conn: ClientConn,
next_id: u64,
endpoint: PathBuf,
}
impl OverlaydClient {
/// Connect once to the overlayd endpoint.
///
/// # Errors
/// Propagates the underlying connect error.
pub async fn connect(endpoint: &Path) -> Result<Self> {
let conn = transport::connect(endpoint).await?;
Ok(Self {
conn,
next_id: 1,
endpoint: endpoint.to_path_buf(),
})
}
/// Connect with exponential backoff (100ms → 1s, ~20 attempts) so the main
/// daemon can start before overlayd has finished binding its socket.
///
/// Use this on the path that has *just spawned* overlayd and legitimately
/// needs to wait for it to bind (the supervisor). On hot paths where a dead
/// overlayd must degrade fast (overlay setup is non-fatal), prefer
/// [`Self::connect_with_attempts`] with a small budget so daemon startup is
/// not held hostage by an unreachable overlayd.
///
/// # Errors
/// Returns the last connect error if every attempt fails.
pub async fn connect_with_backoff(endpoint: &Path) -> Result<Self> {
Self::connect_with_attempts(endpoint, 20).await
}
/// Connect with bounded exponential backoff (100ms → 1s) for at most
/// `max_attempts` attempts. Sleeps only *between* attempts (never after the
/// final one), so the worst-case wall time for a dead endpoint is bounded
/// and predictable: `max_attempts = 6` ≈ 100+200+400+800+1000 = ~2.5s.
///
/// # Errors
/// Returns the last connect error if every attempt fails.
pub async fn connect_with_attempts(endpoint: &Path, max_attempts: u32) -> Result<Self> {
let attempts = max_attempts.max(1);
let mut delay = Duration::from_millis(100);
let max_delay = Duration::from_secs(1);
let mut last_err: Option<OverlaydError> = None;
for attempt in 0..attempts {
match transport::connect(endpoint).await {
Ok(conn) => {
return Ok(Self {
conn,
next_id: 1,
endpoint: endpoint.to_path_buf(),
});
}
Err(e) => {
last_err = Some(e);
// Don't sleep after the final attempt — it's wasted latency.
if attempt + 1 < attempts {
tokio::time::sleep(delay).await;
delay = std::cmp::min(delay * 2, max_delay);
}
}
}
}
Err(last_err.unwrap_or_else(|| OverlaydError::Other("overlayd connect failed".to_string())))
}
/// The endpoint this client is connected to.
#[must_use]
pub fn endpoint(&self) -> &Path {
&self.endpoint
}
/// Send a request and await its matching response. Event frames that arrive
/// while waiting are logged and skipped (a request/response client does not
/// subscribe to events).
///
/// # Errors
/// Propagates transport errors. The returned [`OverlaydResponse`] may itself
/// be [`OverlaydResponse::Err`]; use [`Self::call`] to fold that into an
/// error.
pub async fn request(&mut self, request: OverlaydRequest) -> Result<OverlaydResponse> {
let id = self.next_id;
self.next_id = self.next_id.wrapping_add(1);
self.conn
.send(&OverlaydFrame::Request { id, request })
.await?;
loop {
match self.conn.recv().await? {
OverlaydFrame::Response { id: rid, response } if rid == id => return Ok(response),
OverlaydFrame::Response { id: rid, .. } => {
tracing::warn!(
expected = id,
got = rid,
"overlayd: out-of-order response id"
);
}
OverlaydFrame::Event(ev) => {
tracing::debug!(?ev, "overlayd: event with no subscriber; dropping");
}
OverlaydFrame::Request { .. } => {
return Err(OverlaydError::Other(
"overlayd sent a Request frame to a client".to_string(),
));
}
}
}
}
/// Like [`Self::request`] but maps an `Err` response into
/// [`OverlaydError::Overlay`], so callers get a single `Result`.
///
/// # Errors
/// Transport errors, or [`OverlaydError::Overlay`] if overlayd returned an
/// error response.
pub async fn call(&mut self, request: OverlaydRequest) -> Result<OverlaydResponse> {
match self.request(request).await? {
OverlaydResponse::Err { message } => Err(OverlaydError::Overlay(message)),
other => Ok(other),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
/// A single attempt against a dead endpoint must NOT sleep (the trailing-sleep
/// fix): it should fail effectively immediately, not after the first backoff.
#[tokio::test]
async fn connect_single_attempt_does_not_sleep() {
let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
let start = Instant::now();
let res = OverlaydClient::connect_with_attempts(dead, 1).await;
let elapsed = start.elapsed();
assert!(res.is_err(), "connect to a nonexistent socket must fail");
assert!(
elapsed < Duration::from_millis(80),
"1 attempt must not pay a backoff sleep (took {elapsed:?})"
);
}
/// `connect_with_attempts` is bounded: against a dead endpoint it returns an
/// error after roughly the sum of the inter-attempt backoffs, never hanging.
/// 4 attempts ⇒ sleeps of 100+200+400 ≈ 700ms (no sleep after the 4th).
#[tokio::test]
async fn connect_with_attempts_is_bounded() {
let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
let start = Instant::now();
let res = OverlaydClient::connect_with_attempts(dead, 4).await;
let elapsed = start.elapsed();
assert!(res.is_err());
// Lower bound proves it actually retried with backoff; upper bound proves
// it stayed bounded (and didn't sleep after the final attempt).
assert!(
elapsed >= Duration::from_millis(600),
"4 attempts should back off ~700ms (took {elapsed:?})"
);
assert!(
elapsed < Duration::from_millis(1500),
"4 attempts must stay bounded (took {elapsed:?})"
);
}
/// `max_attempts = 0` is clamped to 1 (one try, no sleep), never an infinite
/// or zero-attempt loop.
#[tokio::test]
async fn connect_zero_attempts_clamped_to_one() {
let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
let res = OverlaydClient::connect_with_attempts(dead, 0).await;
assert!(res.is_err());
}
}