Skip to main content

fox/
lib.rs

1#[cfg(feature = "orchestrator")]
2pub mod future;
3pub mod remote_fn;
4
5#[cfg(feature = "remote")]
6use crate::remote_fn::REMOTE_FN_MAP;
7
8use tokio::{
9    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
10    sync::mpsc,
11};
12use uuid::Uuid;
13
14pub use anyhow_serde::Result;
15pub use anyhow_serde::bail;
16
17#[doc(hidden)]
18pub use inventory;
19#[doc(hidden)]
20pub use postcard;
21#[doc(hidden)]
22pub use serde;
23#[doc(hidden)]
24pub use tokio;
25#[doc(hidden)]
26pub use uuid;
27
28pub use fox_macros::{main, remote, wire};
29
30#[doc(hidden)]
31#[derive(serde::Deserialize, serde::Serialize)]
32pub struct RemoteFnPayload {
33    pub uuid: Uuid,
34    pub fn_id: String,
35    pub data: Vec<u8>,
36}
37
38#[doc(hidden)]
39#[derive(serde::Deserialize, serde::Serialize)]
40pub struct RemoteFnResponsePayload {
41    pub uuid: Uuid,
42    pub data: Vec<u8>,
43}
44
45#[cfg(feature = "remote")]
46pub async fn remote_rpc() {
47    std::fs::remove_file(std::env::current_exe().expect("Couldn't get current executable path"))
48        .expect("Couldn't remove the executable");
49
50    let (process_tx, mut process_rx) = mpsc::unbounded_channel::<RemoteFnPayload>();
51    let (writer_tx, mut writer_rx) = mpsc::unbounded_channel::<Vec<u8>>();
52
53    tokio::spawn(async move {
54        let mut stdin = BufReader::new(tokio::io::stdin());
55        let mut buffer = Vec::with_capacity(4096);
56
57        while stdin.read_until(0x00, &mut buffer).await.unwrap_or(0) > 0 {
58            if let Ok(payload) = postcard::from_bytes_cobs::<RemoteFnPayload>(&mut buffer) {
59                process_tx.send(payload).expect("Processing channel closed");
60            }
61            buffer.clear();
62        }
63    });
64
65    tokio::spawn(async move {
66        let mut stdout = tokio::io::stdout();
67        while let Some(cobs_packet) = writer_rx.recv().await {
68            stdout
69                .write_all(&cobs_packet)
70                .await
71                .expect("Failed to write to stdout");
72            stdout.flush().await.expect("Failed to flush stdout");
73        }
74    });
75
76    while let Some(payload) = process_rx.recv().await {
77        let writer_tx = writer_tx.clone();
78
79        tokio::spawn(async move {
80            let registered_fn = REMOTE_FN_MAP
81                .get(payload.fn_id.as_str())
82                .expect("Called an unregistered remote function");
83
84            let raw_return_bytes =
85                tokio::task::spawn_blocking(move || registered_fn(&payload.data))
86                    .await
87                    .expect("Remote function panicked");
88
89            let response_payload = RemoteFnResponsePayload {
90                uuid: payload.uuid,
91                data: raw_return_bytes,
92            };
93
94            let outbound_cobs = postcard::to_allocvec_cobs(&response_payload)
95                .expect("Failed to serialize response");
96
97            writer_tx
98                .send(outbound_cobs)
99                .expect("Writer channel closed");
100        });
101    }
102}
103
104#[cfg(feature = "orchestrator")]
105pub struct OutboundRegistration {
106    pub payload: RemoteFnPayload,
107    pub tx: tokio::sync::oneshot::Sender<Vec<u8>>,
108}
109
110#[cfg(feature = "orchestrator")]
111std::thread_local! {
112    pub static ORCHESTRATOR_TX: std::cell::Cell<Option<mpsc::UnboundedSender<OutboundRegistration>>> = const { std::cell::Cell::new(None) };
113}
114
115#[cfg(feature = "orchestrator")]
116pub async fn orchestrator_rpc_engine(
117    mut registration_rx: mpsc::UnboundedReceiver<OutboundRegistration>,
118) {
119    let (reader_tx, mut reader_rx) = mpsc::unbounded_channel::<RemoteFnResponsePayload>();
120    let (writer_tx, mut writer_rx) = mpsc::unbounded_channel::<Vec<u8>>();
121
122    let mut pending_futures: std::collections::HashMap<
123        Uuid,
124        tokio::sync::oneshot::Sender<Vec<u8>>,
125    > = std::collections::HashMap::new();
126
127    tokio::spawn(async move {
128        let mut stdin = BufReader::new(tokio::io::stdin());
129        let mut buffer = Vec::with_capacity(4096);
130
131        while stdin.read_until(0x00, &mut buffer).await.unwrap_or(0) > 0 {
132            if let Ok(response) = postcard::from_bytes_cobs::<RemoteFnResponsePayload>(&mut buffer)
133            {
134                let _ = reader_tx.send(response);
135            }
136            buffer.clear();
137        }
138    });
139
140    tokio::spawn(async move {
141        let mut stdout = tokio::io::stdout();
142        while let Some(cobs_packet) = writer_rx.recv().await {
143            let _ = stdout.write_all(&cobs_packet).await;
144            let _ = stdout.flush().await;
145        }
146    });
147
148    loop {
149        tokio::select! {
150            Some(registration) = registration_rx.recv() => {
151                let uuid = registration.payload.uuid;
152
153                let outbound_bytes = postcard::to_allocvec_cobs(&registration.payload).unwrap();
154                let _ = writer_tx.send(outbound_bytes);
155
156                pending_futures.insert(uuid, registration.tx);
157            }
158
159            Some(response) = reader_rx.recv() => {
160                if let Some(tx) = pending_futures.remove(&response.uuid) {
161                    let _ = tx.send(response.data);
162                }
163            }
164
165            else => break,
166        }
167    }
168}
169
170pub mod builtin {
171    use std::path::PathBuf;
172
173    use super as fox;
174    use fox_macros::remote;
175    use serde::{Deserialize, Serialize};
176
177    #[derive(Clone, Copy, Debug, Deserialize, Serialize)]
178    pub struct Mode(pub u32);
179    impl From<Mode> for std::fs::Permissions {
180        fn from(Mode(value): Mode) -> Self {
181            use std::os::unix::fs::PermissionsExt as _;
182            std::fs::Permissions::from_mode(value)
183        }
184    }
185    impl From<u32> for Mode {
186        fn from(value: u32) -> Self {
187            Mode(value)
188        }
189    }
190
191    #[derive(Clone, Debug, Deserialize, Serialize)]
192    pub struct CopyArgs {
193        pub source: Vec<u8>,
194        pub destination: PathBuf,
195        pub owner: String,
196        pub group: String,
197        pub mode: Mode,
198    }
199
200    #[derive(Debug, thiserror::Error)]
201    #[fox::wire]
202    pub enum CopyError {
203        #[error("Invalid user")]
204        InvalidUser,
205        #[error("Invalid group")]
206        InvalidGroup,
207        #[error("Parent directory unavailable")]
208        ParentDirUnavailable,
209        #[error("Couldn't create temporary file")]
210        CannotCreateTempFile,
211        #[error("Couldn't write to temporary file")]
212        CannotWriteToTempFile,
213        #[error("Couldn't change ownership of the file")]
214        Chown,
215        #[error("Couldn't change permissions of the file")]
216        Chmod,
217        #[error("Couldn't persist the temporary file to destination")]
218        TempFilePersist,
219    }
220
221    /// Atomically copy content into a file on the remote machine
222    ///
223    /// ```no_run
224    /// fox::builtin::copy(fox::builtin::CopyArgs {
225    ///     source: "hello".into(),
226    ///     destination: "/var/tmp/hello.txt".into(),
227    /// }).unwrap()
228    /// ```
229    #[remote]
230    pub fn copy(args: CopyArgs) -> Result<(), CopyError> {
231        use std::io::Write as _;
232
233        let user = nix::unistd::User::from_name(&args.owner)
234            .ok()
235            .flatten()
236            .ok_or(CopyError::InvalidUser)?;
237        let group = nix::unistd::Group::from_name(&args.group)
238            .ok()
239            .flatten()
240            .ok_or(CopyError::InvalidGroup)?;
241
242        let mut tmp_file = tempfile::Builder::new()
243            .prefix(".fox.")
244            .tempfile_in(
245                args.destination
246                    .parent()
247                    .ok_or(CopyError::ParentDirUnavailable)?,
248            )
249            .map_err(|_| CopyError::CannotCreateTempFile)?;
250
251        tmp_file
252            .write_all(&args.source)
253            .map_err(|_| CopyError::CannotWriteToTempFile)?;
254
255        nix::unistd::chown(tmp_file.path(), Some(user.uid), Some(group.gid))
256            .map_err(|_| CopyError::Chown)?;
257        std::fs::set_permissions(tmp_file.path(), args.mode.into())
258            .map_err(|_| CopyError::Chmod)?;
259
260        tmp_file
261            .persist(&args.destination)
262            .map_err(|_| CopyError::TempFilePersist)?;
263
264        Ok(())
265    }
266}