nix_remote/
lib.rs

1use anyhow::anyhow;
2use serde::{Deserialize, Serialize};
3use serde_bytes::ByteBuf;
4use serialize::NixSerializer;
5use std::{
6    ffi::OsStr,
7    io::{Read, Write},
8    os::unix::prelude::OsStrExt,
9    string::FromUtf8Error,
10};
11
12use worker_op::ValidPathInfo;
13
14pub mod framed_data;
15pub mod nar;
16pub mod serialize;
17pub mod stderr;
18pub mod worker_op;
19
20pub use serialize::{NixReadExt, NixWriteExt};
21
22use crate::worker_op::{Stream, WorkerOp};
23
24#[derive(Debug, thiserror::Error)]
25pub enum Error {
26    #[error("I/O error: {0}")]
27    Io(#[from] std::io::Error),
28
29    #[error("(De)serialization error: {0}")]
30    Deser(#[from] serialize::Error),
31
32    #[error("Other error: {0}")]
33    Other(#[from] anyhow::Error),
34}
35
36pub type Result<T, E = Error> = std::result::Result<T, E>;
37
38#[derive(Deserialize, Serialize, Clone, PartialEq, Debug, Eq, Hash)]
39#[serde(transparent)]
40pub struct StorePath(pub NixString);
41
42impl AsRef<[u8]> for StorePath {
43    fn as_ref(&self) -> &[u8] {
44        self.0.as_ref()
45    }
46}
47
48#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
49#[serde(transparent)]
50pub struct Path(pub NixString);
51
52impl AsRef<[u8]> for Path {
53    fn as_ref(&self) -> &[u8] {
54        self.0.as_ref()
55    }
56}
57
58impl AsRef<OsStr> for Path {
59    fn as_ref(&self) -> &OsStr {
60        OsStr::from_bytes(self.as_ref())
61    }
62}
63
64#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
65#[serde(transparent)]
66pub struct DerivedPath(pub NixString);
67
68impl AsRef<[u8]> for DerivedPath {
69    fn as_ref(&self) -> &[u8] {
70        self.0.as_ref()
71    }
72}
73
74/// A string from nix.
75///
76/// Strings in the nix protocol are not necessarily UTF-8, so this is
77/// different from the rust standard `String`.
78#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Default, Hash)]
79#[serde(transparent)]
80pub struct NixString(pub ByteBuf);
81
82impl NixString {
83    pub fn to_string(&self) -> Result<String, FromUtf8Error> {
84        String::from_utf8(self.0.as_slice().to_owned())
85    }
86
87    pub fn from_bytes(bytes: &[u8]) -> Self {
88        NixString(ByteBuf::from(bytes.to_vec()))
89    }
90}
91
92impl std::fmt::Debug for NixString {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.write_str(&String::from_utf8_lossy(&self.0))
95    }
96}
97
98impl AsRef<[u8]> for NixString {
99    fn as_ref(&self) -> &[u8] {
100        self.0.as_ref()
101    }
102}
103
104impl AsRef<OsStr> for NixString {
105    fn as_ref(&self) -> &OsStr {
106        OsStr::from_bytes(self.as_ref())
107    }
108}
109
110const WORKER_MAGIC_1: u64 = 0x6e697863;
111const WORKER_MAGIC_2: u64 = 0x6478696f;
112const PROTOCOL_VERSION: DaemonVersion = DaemonVersion {
113    major: 1,
114    minor: 34,
115};
116
117struct DaemonHandle {
118    child_in: std::process::ChildStdin,
119    child_out: std::process::ChildStdout,
120}
121
122impl DaemonHandle {
123    pub fn new() -> Self {
124        let mut child = std::process::Command::new("nix-daemon")
125            .arg("--stdio")
126            .stdin(std::process::Stdio::piped())
127            .stdout(std::process::Stdio::piped())
128            .spawn()
129            .unwrap();
130
131        Self {
132            child_in: child.stdin.take().unwrap(),
133            child_out: child.stdout.take().unwrap(),
134        }
135    }
136}
137
138impl Default for DaemonHandle {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144/// A proxy to the nix daemon.
145///
146/// This doesn't currently *do* very much, it just inspects the protocol as it goes past.
147/// But it can be used to test our protocol implementation.
148pub struct NixProxy<R, W> {
149    pub read: NixRead<R>,
150    pub write: NixWrite<W>,
151    proxy: DaemonHandle,
152}
153
154impl<R: Read, W: Write> NixProxy<R, W> {
155    pub fn new(r: R, w: W) -> Self {
156        Self {
157            read: NixRead { inner: r },
158            write: NixWrite { inner: w },
159            proxy: DaemonHandle::new(),
160        }
161    }
162}
163
164/// A wrapper around a `std::io::Read`, adding support for the nix wire format.
165pub struct NixRead<R> {
166    pub inner: R,
167}
168
169/// A wrapper around a `std::io::Write`, adding support for the nix wire format.
170pub struct NixWrite<W> {
171    pub inner: W,
172}
173
174/// A set of paths.
175#[derive(Clone, Debug, Serialize, Deserialize)]
176pub struct PathSet {
177    pub paths: Vec<Path>,
178}
179
180/// A set of store paths.
181#[derive(Clone, Debug, Serialize, Deserialize)]
182pub struct StorePathSet {
183    // TODO: in nix, they call `parseStorePath` to separate store directory from path
184    pub paths: Vec<StorePath>,
185}
186
187/// A set of strings.
188#[derive(Clone, Debug, Serialize, Deserialize)]
189pub struct StringSet {
190    pub paths: Vec<NixString>,
191}
192
193/// A realisation.
194#[derive(Clone, Debug, Serialize, Deserialize)]
195pub struct Realisation(pub NixString);
196
197/// A set of realisations.
198#[derive(Clone, Debug, Serialize, Deserialize)]
199pub struct RealisationSet {
200    pub realisations: Vec<Realisation>,
201}
202
203/// A nar hash.
204#[derive(Clone, Debug, Serialize, Deserialize)]
205pub struct NarHash {
206    /// This data has not been validated; this is just copied from the wire.
207    pub data: ByteBuf,
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize)]
211pub struct ValidPathInfoWithPath {
212    pub path: StorePath,
213    pub info: ValidPathInfo,
214}
215
216impl<R: Read> NixRead<R> {
217    /// Read an integer from the wire.
218    pub fn read_u64(&mut self) -> serialize::Result<u64> {
219        self.inner.read_nix()
220    }
221
222    /// Read a "string" (really, a byte buffer) from the wire.
223    pub fn read_string(&mut self) -> serialize::Result<NixString> {
224        self.inner.read_nix()
225    }
226
227    /// Read any serializable type from the wire.
228    pub fn read_nix(&mut self) -> serialize::Result<()> {
229        self.inner.read_nix()
230    }
231}
232
233impl<W: Write> NixWrite<W> {
234    /// Write an integer to the wire.
235    pub fn write_u64(&mut self, n: u64) -> serialize::Result<()> {
236        self.inner.write_nix(&n)
237    }
238
239    /// Write a "string" (really, a byte buffer) to the wire.
240    pub fn write_string(&mut self, s: &[u8]) -> serialize::Result<()> {
241        NixSerializer {
242            write: &mut self.inner,
243        }
244        .write_byte_buf(s)
245    }
246
247    /// Write any serializable type to the wire.
248    ///
249    /// *Warning*: don't call this with `[u8]` data: that will (attempt to)
250    /// serialize a sequence of `u8`s, and then panic because the nix wire
251    /// protocol only supports 64-bit integers. If you want to write a byte
252    /// buffer, use [`NixWrite::write_string`] instead.
253    pub fn write_nix(&mut self, data: &impl Serialize) -> serialize::Result<()> {
254        self.inner.write_nix(data)
255    }
256
257    /// Flush the underlying writer.
258    pub fn flush(&mut self) -> Result<()> {
259        Ok(self.inner.flush()?)
260    }
261}
262
263impl<R: Read, W: Write> NixProxy<R, W> {
264    // Wait for an initialization message from the client, and perform
265    // the version negotiation.
266    //
267    // Returns the client version.
268    pub fn handshake(&mut self) -> Result<u64> {
269        let magic = self.read.read_u64()?;
270        if magic != WORKER_MAGIC_1 {
271            eprintln!("{magic:x}");
272            eprintln!("{WORKER_MAGIC_1:x}");
273            todo!("handle error: protocol mismatch 1");
274        }
275
276        self.write.write_u64(WORKER_MAGIC_2)?;
277        self.write.write_u64(PROTOCOL_VERSION.into())?;
278        self.write.flush()?;
279
280        let client_version = self.read.read_u64()?;
281
282        if client_version < PROTOCOL_VERSION.into() {
283            Err(anyhow!("Client version {client_version} is too old"))?;
284        }
285
286        // TODO keep track of number of WorkerOps performed
287        let mut _op_count: u64 = 0;
288
289        let _obsolete_cpu_affinity = self.read.read_u64()?;
290        let _obsolete_reserve_space = self.read.read_u64()?;
291        self.write.write_string("rust-nix-bazel-0.1.0".as_bytes())?;
292        self.write.flush()?;
293        Ok(PROTOCOL_VERSION.into())
294    }
295
296    fn forward_stderr(&mut self) -> Result<()> {
297        loop {
298            let msg: stderr::Msg = self.proxy.child_out.read_nix()?;
299            self.write.inner.write_nix(&msg)?;
300            eprintln!("read stderr msg {msg:?}");
301            self.write.inner.flush()?;
302
303            if msg == stderr::Msg::Last(()) {
304                break;
305            }
306        }
307        Ok(())
308    }
309
310    pub fn next_op(&mut self) -> Result<Option<WorkerOp>> {
311        match self.read.inner.read_nix::<WorkerOp>() {
312            Err(serialize::Error::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
313                Ok(None)
314            }
315            Err(e) => Err(e.into()),
316            Ok(x) => Ok(Some(x)),
317        }
318    }
319
320    /// Process a remote nix connection.
321    pub fn process_connection(&mut self) -> Result<()>
322    where
323        W: Send,
324    {
325        let client_version = self.handshake()?;
326
327        // Shake hands with the daemon that we're proxying.
328        self.proxy.child_in.write_nix(&WORKER_MAGIC_1)?;
329        self.proxy.child_in.flush()?;
330        let magic: u64 = self.proxy.child_out.read_nix()?;
331        if magic != WORKER_MAGIC_2 {
332            Err(anyhow!("unexpected WORKER_MAGIC_2: got {magic:x}"))?;
333        }
334        let protocol_version: u64 = self.proxy.child_out.read_nix()?;
335        if protocol_version < PROTOCOL_VERSION.into() {
336            Err(anyhow!(
337                "unexpected protocol version: got {protocol_version}"
338            ))?;
339        }
340        self.proxy.child_in.write_nix(&client_version)?;
341        self.proxy.child_in.write_nix(&0u64)?; // cpu affinity, obsolete
342        self.proxy.child_in.write_nix(&0u64)?; // reserve space, obsolete
343        self.proxy.child_in.flush()?;
344        let proxy_daemon_version: NixString = self.proxy.child_out.read_nix()?;
345        eprintln!(
346            "Proxy daemon is: {}",
347            String::from_utf8_lossy(proxy_daemon_version.0.as_ref())
348        );
349        self.forward_stderr()?;
350
351        loop {
352            let op = match self.read.inner.read_nix::<WorkerOp>() {
353                Err(serialize::Error::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
354                    eprintln!("EOF, closing");
355                    break;
356                }
357                x => x,
358            }?;
359
360            eprintln!("read op {op:?}");
361            self.proxy.child_in.write_nix(&op).unwrap();
362            op.stream(&mut self.read.inner, &mut self.proxy.child_in)
363                .unwrap();
364            self.proxy.child_in.flush().unwrap();
365
366            self.forward_stderr()?;
367
368            // Read back the actual response.
369            op.proxy_response(&mut self.proxy.child_out, &mut self.write.inner)?;
370            self.write.inner.flush()?;
371        }
372        Ok(())
373    }
374}
375
376#[derive(Debug, Clone, Copy, PartialEq)]
377struct DaemonVersion {
378    major: u8,
379    minor: u8,
380}
381
382impl From<u64> for DaemonVersion {
383    fn from(x: u64) -> Self {
384        let major = ((x >> 8) & 0xff) as u8;
385        let minor = (x & 0xff) as u8;
386        Self { major, minor }
387    }
388}
389
390impl From<DaemonVersion> for u64 {
391    fn from(DaemonVersion { major, minor }: DaemonVersion) -> Self {
392        ((major as u64) << 8) | minor as u64
393    }
394}