Skip to main content

lb_rs/ipc/
mod.rs

1pub mod client;
2pub mod protocol;
3#[cfg(unix)]
4pub mod server;
5
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8
9use crate::ipc::client::RemoteCallError;
10use crate::ipc::protocol::Request;
11use crate::model::core_config::Config;
12use crate::model::errors::{LbErrKind, LbResult};
13use crate::service::logging;
14use crate::{Lb, LocalLb};
15
16pub const SOCKET_FILENAME: &str = "lb.sock";
17
18pub fn socket_path(writeable_path: impl AsRef<Path>) -> PathBuf {
19    writeable_path.as_ref().join(SOCKET_FILENAME)
20}
21
22impl Lb {
23    pub fn is_local(&self) -> bool {
24        self.local.get().is_some()
25    }
26
27    pub fn try_local(&self) -> Option<LocalLb> {
28        self.local.get().cloned()
29    }
30
31    #[cfg_attr(not(unix), allow(dead_code))]
32    pub(crate) async fn recover(&self) -> LbResult<LocalLb> {
33        if let Some(local) = self.local.get() {
34            return Ok(local.clone());
35        }
36        let loc = match LocalLb::init(self.config.clone()).await {
37            Ok(l) => l,
38            Err(e) => {
39                if let Some(local) = self.local.get() {
40                    return Ok(local.clone());
41                }
42                return Err(e);
43            }
44        };
45        logging::init(&loc.config)?;
46        spawn_host(loc.clone());
47        match self.local.set(loc.clone()) {
48            Ok(()) => Ok(loc),
49            Err(_) => Ok(self.local.get().unwrap().clone()),
50        }
51    }
52
53    pub async fn call<Out>(&self, req: Request) -> LbResult<Out>
54    where
55        Out: serde::de::DeserializeOwned,
56    {
57        let remote = self
58            .remote
59            .as_ref()
60            .expect("Lb::call: remote must be set when local is unset");
61        match remote.try_call::<Out>(req.clone()).await {
62            Ok(v) => Ok(v),
63            #[cfg(unix)]
64            Err(RemoteCallError::HostUnavailable) => {
65                let local = self.recover().await?;
66                let bytes = server::dispatch(&local, req).await;
67                let result: LbResult<Out> = bincode::deserialize(&bytes).map_err(|e| {
68                    LbErrKind::Unexpected(format!("local dispatch deserialize: {e}"))
69                })?;
70                result
71            }
72            #[cfg(not(unix))]
73            Err(RemoteCallError::HostUnavailable) => {
74                let _ = req;
75                unreachable!("HostUnavailable cannot occur on non-unix targets")
76            }
77            Err(RemoteCallError::Other(e)) => Err(e),
78        }
79    }
80}
81
82pub fn spawn_host(lb: LocalLb) {
83    #[cfg(not(unix))]
84    {
85        let _ = lb;
86    }
87    #[cfg(unix)]
88    {
89        let socket = socket_path(&lb.config.writeable_path);
90        if socket.exists() {
91            let _ = std::fs::remove_file(&socket);
92        }
93        match tokio::net::UnixListener::bind(&socket) {
94            Ok(listener) => {
95                tokio::spawn(server::serve(listener, lb));
96            }
97            Err(err) => {
98                tracing::warn!(?err, "failed to bind ipc listener; guests cannot attach");
99            }
100        }
101    }
102}
103
104pub async fn connect_guest(config: &Config) -> Option<Arc<client::RemoteLb>> {
105    #[cfg(not(unix))]
106    {
107        let _ = config;
108        None
109    }
110    #[cfg(unix)]
111    {
112        let socket = socket_path(&config.writeable_path);
113        if !socket.exists() {
114            return None;
115        }
116        let mut attempts: u32 = 0;
117        let mut delay = std::time::Duration::from_millis(10);
118        loop {
119            match client::RemoteLb::connect(&socket).await {
120                Ok(c) => return Some(c),
121                Err(_) if attempts < 10 => {
122                    attempts += 1;
123                    tokio::time::sleep(delay).await;
124                    delay = std::cmp::min(delay * 2, std::time::Duration::from_millis(500));
125                }
126                Err(_) => return None,
127            }
128        }
129    }
130}