1#[cfg(feature = "orchestrator")]
2pub mod future;
3pub mod remote_fn;
4
5#[cfg(feature = "remote")]
6use crate::remote_fn::REMOTE_FN_MAP;
7
8use tokio::{
9 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
10 sync::mpsc,
11};
12use uuid::Uuid;
13
14pub use anyhow_serde::Result;
15pub use anyhow_serde::bail;
16
17#[doc(hidden)]
18pub use inventory;
19#[doc(hidden)]
20pub use postcard;
21#[doc(hidden)]
22pub use serde;
23#[doc(hidden)]
24pub use tokio;
25#[doc(hidden)]
26pub use uuid;
27
28pub use fox_macros::{main, remote, wire};
29
30#[doc(hidden)]
31#[derive(serde::Deserialize, serde::Serialize)]
32pub struct RemoteFnPayload {
33 pub uuid: Uuid,
34 pub fn_id: String,
35 pub data: Vec<u8>,
36}
37
38#[doc(hidden)]
39#[derive(serde::Deserialize, serde::Serialize)]
40pub struct RemoteFnResponsePayload {
41 pub uuid: Uuid,
42 pub data: Vec<u8>,
43}
44
45#[cfg(feature = "remote")]
46pub async fn remote_rpc() {
47 std::fs::remove_file(std::env::current_exe().expect("Couldn't get current executable path"))
48 .expect("Couldn't remove the executable");
49
50 let (process_tx, mut process_rx) = mpsc::unbounded_channel::<RemoteFnPayload>();
51 let (writer_tx, mut writer_rx) = mpsc::unbounded_channel::<Vec<u8>>();
52
53 tokio::spawn(async move {
54 let mut stdin = BufReader::new(tokio::io::stdin());
55 let mut buffer = Vec::with_capacity(4096);
56
57 while stdin.read_until(0x00, &mut buffer).await.unwrap_or(0) > 0 {
58 if let Ok(payload) = postcard::from_bytes_cobs::<RemoteFnPayload>(&mut buffer) {
59 process_tx.send(payload).expect("Processing channel closed");
60 }
61 buffer.clear();
62 }
63 });
64
65 tokio::spawn(async move {
66 let mut stdout = tokio::io::stdout();
67 while let Some(cobs_packet) = writer_rx.recv().await {
68 stdout
69 .write_all(&cobs_packet)
70 .await
71 .expect("Failed to write to stdout");
72 stdout.flush().await.expect("Failed to flush stdout");
73 }
74 });
75
76 while let Some(payload) = process_rx.recv().await {
77 let writer_tx = writer_tx.clone();
78
79 tokio::spawn(async move {
80 let registered_fn = REMOTE_FN_MAP
81 .get(payload.fn_id.as_str())
82 .expect("Called an unregistered remote function");
83
84 let raw_return_bytes =
85 tokio::task::spawn_blocking(move || registered_fn(&payload.data))
86 .await
87 .expect("Remote function panicked");
88
89 let response_payload = RemoteFnResponsePayload {
90 uuid: payload.uuid,
91 data: raw_return_bytes,
92 };
93
94 let outbound_cobs = postcard::to_allocvec_cobs(&response_payload)
95 .expect("Failed to serialize response");
96
97 writer_tx
98 .send(outbound_cobs)
99 .expect("Writer channel closed");
100 });
101 }
102}
103
104#[cfg(feature = "orchestrator")]
105pub struct OutboundRegistration {
106 pub payload: RemoteFnPayload,
107 pub tx: tokio::sync::oneshot::Sender<Vec<u8>>,
108}
109
110#[cfg(feature = "orchestrator")]
111std::thread_local! {
112 pub static ORCHESTRATOR_TX: std::cell::Cell<Option<mpsc::UnboundedSender<OutboundRegistration>>> = const { std::cell::Cell::new(None) };
113}
114
115#[cfg(feature = "orchestrator")]
116pub async fn orchestrator_rpc_engine(
117 mut registration_rx: mpsc::UnboundedReceiver<OutboundRegistration>,
118) {
119 let (reader_tx, mut reader_rx) = mpsc::unbounded_channel::<RemoteFnResponsePayload>();
120 let (writer_tx, mut writer_rx) = mpsc::unbounded_channel::<Vec<u8>>();
121
122 let mut pending_futures: std::collections::HashMap<
123 Uuid,
124 tokio::sync::oneshot::Sender<Vec<u8>>,
125 > = std::collections::HashMap::new();
126
127 tokio::spawn(async move {
128 let mut stdin = BufReader::new(tokio::io::stdin());
129 let mut buffer = Vec::with_capacity(4096);
130
131 while stdin.read_until(0x00, &mut buffer).await.unwrap_or(0) > 0 {
132 if let Ok(response) = postcard::from_bytes_cobs::<RemoteFnResponsePayload>(&mut buffer)
133 {
134 let _ = reader_tx.send(response);
135 }
136 buffer.clear();
137 }
138 });
139
140 tokio::spawn(async move {
141 let mut stdout = tokio::io::stdout();
142 while let Some(cobs_packet) = writer_rx.recv().await {
143 let _ = stdout.write_all(&cobs_packet).await;
144 let _ = stdout.flush().await;
145 }
146 });
147
148 loop {
149 tokio::select! {
150 Some(registration) = registration_rx.recv() => {
151 let uuid = registration.payload.uuid;
152
153 let outbound_bytes = postcard::to_allocvec_cobs(®istration.payload).unwrap();
154 let _ = writer_tx.send(outbound_bytes);
155
156 pending_futures.insert(uuid, registration.tx);
157 }
158
159 Some(response) = reader_rx.recv() => {
160 if let Some(tx) = pending_futures.remove(&response.uuid) {
161 let _ = tx.send(response.data);
162 }
163 }
164
165 else => break,
166 }
167 }
168}
169
170pub mod builtin {
171 use std::path::PathBuf;
172
173 use super as fox;
174 use fox_macros::remote;
175 use serde::{Deserialize, Serialize};
176
177 #[derive(Clone, Copy, Debug, Deserialize, Serialize)]
178 pub struct Mode(pub u32);
179 impl From<Mode> for std::fs::Permissions {
180 fn from(Mode(value): Mode) -> Self {
181 use std::os::unix::fs::PermissionsExt as _;
182 std::fs::Permissions::from_mode(value)
183 }
184 }
185 impl From<u32> for Mode {
186 fn from(value: u32) -> Self {
187 Mode(value)
188 }
189 }
190
191 #[derive(Clone, Debug, Deserialize, Serialize)]
192 pub struct CopyArgs {
193 pub source: Vec<u8>,
194 pub destination: PathBuf,
195 pub owner: String,
196 pub group: String,
197 pub mode: Mode,
198 }
199
200 #[derive(Debug, thiserror::Error)]
201 #[fox::wire]
202 pub enum CopyError {
203 #[error("Invalid user")]
204 InvalidUser,
205 #[error("Invalid group")]
206 InvalidGroup,
207 #[error("Parent directory unavailable")]
208 ParentDirUnavailable,
209 #[error("Couldn't create temporary file")]
210 CannotCreateTempFile,
211 #[error("Couldn't write to temporary file")]
212 CannotWriteToTempFile,
213 #[error("Couldn't change ownership of the file")]
214 Chown,
215 #[error("Couldn't change permissions of the file")]
216 Chmod,
217 #[error("Couldn't persist the temporary file to destination")]
218 TempFilePersist,
219 }
220
221 #[remote]
230 pub fn copy(args: CopyArgs) -> Result<(), CopyError> {
231 use std::io::Write as _;
232
233 let user = nix::unistd::User::from_name(&args.owner)
234 .ok()
235 .flatten()
236 .ok_or(CopyError::InvalidUser)?;
237 let group = nix::unistd::Group::from_name(&args.group)
238 .ok()
239 .flatten()
240 .ok_or(CopyError::InvalidGroup)?;
241
242 let mut tmp_file = tempfile::Builder::new()
243 .prefix(".fox.")
244 .tempfile_in(
245 args.destination
246 .parent()
247 .ok_or(CopyError::ParentDirUnavailable)?,
248 )
249 .map_err(|_| CopyError::CannotCreateTempFile)?;
250
251 tmp_file
252 .write_all(&args.source)
253 .map_err(|_| CopyError::CannotWriteToTempFile)?;
254
255 nix::unistd::chown(tmp_file.path(), Some(user.uid), Some(group.gid))
256 .map_err(|_| CopyError::Chown)?;
257 std::fs::set_permissions(tmp_file.path(), args.mode.into())
258 .map_err(|_| CopyError::Chmod)?;
259
260 tmp_file
261 .persist(&args.destination)
262 .map_err(|_| CopyError::TempFilePersist)?;
263
264 Ok(())
265 }
266}