Skip to main content

zlayer_overlayd/
client.rs

1//! Client the main `zlayer` daemon uses to drive overlayd over IPC.
2//!
3//! Holds one framed connection and issues request/response round-trips. The
4//! agent's `overlay_manager` shim wraps this (behind a `Mutex`) and maps its
5//! public methods to [`OverlaydRequest`]s.
6
7use std::path::{Path, PathBuf};
8use std::time::Duration;
9
10use zlayer_types::overlayd::{OverlaydFrame, OverlaydRequest, OverlaydResponse};
11
12use crate::error::{OverlaydError, Result};
13use crate::transport::{self, ClientConn};
14
15/// A connected overlayd client.
16pub struct OverlaydClient {
17    conn: ClientConn,
18    next_id: u64,
19    endpoint: PathBuf,
20}
21
22impl OverlaydClient {
23    /// Connect once to the overlayd endpoint.
24    ///
25    /// # Errors
26    /// Propagates the underlying connect error.
27    pub async fn connect(endpoint: &Path) -> Result<Self> {
28        let conn = transport::connect(endpoint).await?;
29        Ok(Self {
30            conn,
31            next_id: 1,
32            endpoint: endpoint.to_path_buf(),
33        })
34    }
35
36    /// Connect with exponential backoff (100ms → 1s, ~20 attempts) so the main
37    /// daemon can start before overlayd has finished binding its socket.
38    ///
39    /// Use this on the path that has *just spawned* overlayd and legitimately
40    /// needs to wait for it to bind (the supervisor). On hot paths where a dead
41    /// overlayd must degrade fast (overlay setup is non-fatal), prefer
42    /// [`Self::connect_with_attempts`] with a small budget so daemon startup is
43    /// not held hostage by an unreachable overlayd.
44    ///
45    /// # Errors
46    /// Returns the last connect error if every attempt fails.
47    pub async fn connect_with_backoff(endpoint: &Path) -> Result<Self> {
48        Self::connect_with_attempts(endpoint, 20).await
49    }
50
51    /// Connect with bounded exponential backoff (100ms → 1s) for at most
52    /// `max_attempts` attempts. Sleeps only *between* attempts (never after the
53    /// final one), so the worst-case wall time for a dead endpoint is bounded
54    /// and predictable: `max_attempts = 6` ≈ 100+200+400+800+1000 = ~2.5s.
55    ///
56    /// # Errors
57    /// Returns the last connect error if every attempt fails.
58    pub async fn connect_with_attempts(endpoint: &Path, max_attempts: u32) -> Result<Self> {
59        let attempts = max_attempts.max(1);
60        let mut delay = Duration::from_millis(100);
61        let max_delay = Duration::from_secs(1);
62        let mut last_err: Option<OverlaydError> = None;
63        for attempt in 0..attempts {
64            match transport::connect(endpoint).await {
65                Ok(conn) => {
66                    return Ok(Self {
67                        conn,
68                        next_id: 1,
69                        endpoint: endpoint.to_path_buf(),
70                    });
71                }
72                Err(e) => {
73                    last_err = Some(e);
74                    // Don't sleep after the final attempt — it's wasted latency.
75                    if attempt + 1 < attempts {
76                        tokio::time::sleep(delay).await;
77                        delay = std::cmp::min(delay * 2, max_delay);
78                    }
79                }
80            }
81        }
82        Err(last_err.unwrap_or_else(|| OverlaydError::Other("overlayd connect failed".to_string())))
83    }
84
85    /// The endpoint this client is connected to.
86    #[must_use]
87    pub fn endpoint(&self) -> &Path {
88        &self.endpoint
89    }
90
91    /// Send a request and await its matching response. Event frames that arrive
92    /// while waiting are logged and skipped (a request/response client does not
93    /// subscribe to events).
94    ///
95    /// # Errors
96    /// Propagates transport errors. The returned [`OverlaydResponse`] may itself
97    /// be [`OverlaydResponse::Err`]; use [`Self::call`] to fold that into an
98    /// error.
99    pub async fn request(&mut self, request: OverlaydRequest) -> Result<OverlaydResponse> {
100        let id = self.next_id;
101        self.next_id = self.next_id.wrapping_add(1);
102        self.conn
103            .send(&OverlaydFrame::Request { id, request })
104            .await?;
105        loop {
106            match self.conn.recv().await? {
107                OverlaydFrame::Response { id: rid, response } if rid == id => return Ok(response),
108                OverlaydFrame::Response { id: rid, .. } => {
109                    tracing::warn!(
110                        expected = id,
111                        got = rid,
112                        "overlayd: out-of-order response id"
113                    );
114                }
115                OverlaydFrame::Event(ev) => {
116                    tracing::debug!(?ev, "overlayd: event with no subscriber; dropping");
117                }
118                OverlaydFrame::Request { .. } => {
119                    return Err(OverlaydError::Other(
120                        "overlayd sent a Request frame to a client".to_string(),
121                    ));
122                }
123            }
124        }
125    }
126
127    /// Like [`Self::request`] but maps an `Err` response into
128    /// [`OverlaydError::Overlay`], so callers get a single `Result`.
129    ///
130    /// # Errors
131    /// Transport errors, or [`OverlaydError::Overlay`] if overlayd returned an
132    /// error response.
133    pub async fn call(&mut self, request: OverlaydRequest) -> Result<OverlaydResponse> {
134        match self.request(request).await? {
135            OverlaydResponse::Err { message } => Err(OverlaydError::Overlay(message)),
136            other => Ok(other),
137        }
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use std::time::Instant;
145
146    /// A single attempt against a dead endpoint must NOT sleep (the trailing-sleep
147    /// fix): it should fail effectively immediately, not after the first backoff.
148    #[tokio::test]
149    async fn connect_single_attempt_does_not_sleep() {
150        let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
151        let start = Instant::now();
152        let res = OverlaydClient::connect_with_attempts(dead, 1).await;
153        let elapsed = start.elapsed();
154        assert!(res.is_err(), "connect to a nonexistent socket must fail");
155        assert!(
156            elapsed < Duration::from_millis(80),
157            "1 attempt must not pay a backoff sleep (took {elapsed:?})"
158        );
159    }
160
161    /// `connect_with_attempts` is bounded: against a dead endpoint it returns an
162    /// error after roughly the sum of the inter-attempt backoffs, never hanging.
163    /// 4 attempts ⇒ sleeps of 100+200+400 ≈ 700ms (no sleep after the 4th).
164    #[tokio::test]
165    async fn connect_with_attempts_is_bounded() {
166        let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
167        let start = Instant::now();
168        let res = OverlaydClient::connect_with_attempts(dead, 4).await;
169        let elapsed = start.elapsed();
170        assert!(res.is_err());
171        // Lower bound proves it actually retried with backoff; upper bound proves
172        // it stayed bounded (and didn't sleep after the final attempt).
173        assert!(
174            elapsed >= Duration::from_millis(600),
175            "4 attempts should back off ~700ms (took {elapsed:?})"
176        );
177        assert!(
178            elapsed < Duration::from_millis(1500),
179            "4 attempts must stay bounded (took {elapsed:?})"
180        );
181    }
182
183    /// `max_attempts = 0` is clamped to 1 (one try, no sleep), never an infinite
184    /// or zero-attempt loop.
185    #[tokio::test]
186    async fn connect_zero_attempts_clamped_to_one() {
187        let dead = Path::new("/nonexistent/zlayer-overlayd-test.sock");
188        let res = OverlaydClient::connect_with_attempts(dead, 0).await;
189        assert!(res.is_err());
190    }
191}