1use anyhow::anyhow;
2use serde::{Deserialize, Serialize};
3use serde_bytes::ByteBuf;
4use serialize::NixSerializer;
5use std::{
6 ffi::OsStr,
7 io::{Read, Write},
8 os::unix::prelude::OsStrExt,
9 string::FromUtf8Error,
10};
11
12use worker_op::ValidPathInfo;
13
14pub mod framed_data;
15pub mod nar;
16pub mod serialize;
17pub mod stderr;
18pub mod worker_op;
19
20pub use serialize::{NixReadExt, NixWriteExt};
21
22use crate::worker_op::{Stream, WorkerOp};
23
24#[derive(Debug, thiserror::Error)]
25pub enum Error {
26 #[error("I/O error: {0}")]
27 Io(#[from] std::io::Error),
28
29 #[error("(De)serialization error: {0}")]
30 Deser(#[from] serialize::Error),
31
32 #[error("Other error: {0}")]
33 Other(#[from] anyhow::Error),
34}
35
36pub type Result<T, E = Error> = std::result::Result<T, E>;
37
38#[derive(Deserialize, Serialize, Clone, PartialEq, Debug, Eq, Hash)]
39#[serde(transparent)]
40pub struct StorePath(pub NixString);
41
42impl AsRef<[u8]> for StorePath {
43 fn as_ref(&self) -> &[u8] {
44 self.0.as_ref()
45 }
46}
47
48#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
49#[serde(transparent)]
50pub struct Path(pub NixString);
51
52impl AsRef<[u8]> for Path {
53 fn as_ref(&self) -> &[u8] {
54 self.0.as_ref()
55 }
56}
57
58impl AsRef<OsStr> for Path {
59 fn as_ref(&self) -> &OsStr {
60 OsStr::from_bytes(self.as_ref())
61 }
62}
63
64#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
65#[serde(transparent)]
66pub struct DerivedPath(pub NixString);
67
68impl AsRef<[u8]> for DerivedPath {
69 fn as_ref(&self) -> &[u8] {
70 self.0.as_ref()
71 }
72}
73
74#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, Default, Hash)]
79#[serde(transparent)]
80pub struct NixString(pub ByteBuf);
81
82impl NixString {
83 pub fn to_string(&self) -> Result<String, FromUtf8Error> {
84 String::from_utf8(self.0.as_slice().to_owned())
85 }
86
87 pub fn from_bytes(bytes: &[u8]) -> Self {
88 NixString(ByteBuf::from(bytes.to_vec()))
89 }
90}
91
92impl std::fmt::Debug for NixString {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.write_str(&String::from_utf8_lossy(&self.0))
95 }
96}
97
98impl AsRef<[u8]> for NixString {
99 fn as_ref(&self) -> &[u8] {
100 self.0.as_ref()
101 }
102}
103
104impl AsRef<OsStr> for NixString {
105 fn as_ref(&self) -> &OsStr {
106 OsStr::from_bytes(self.as_ref())
107 }
108}
109
110const WORKER_MAGIC_1: u64 = 0x6e697863;
111const WORKER_MAGIC_2: u64 = 0x6478696f;
112const PROTOCOL_VERSION: DaemonVersion = DaemonVersion {
113 major: 1,
114 minor: 34,
115};
116
117struct DaemonHandle {
118 child_in: std::process::ChildStdin,
119 child_out: std::process::ChildStdout,
120}
121
122impl DaemonHandle {
123 pub fn new() -> Self {
124 let mut child = std::process::Command::new("nix-daemon")
125 .arg("--stdio")
126 .stdin(std::process::Stdio::piped())
127 .stdout(std::process::Stdio::piped())
128 .spawn()
129 .unwrap();
130
131 Self {
132 child_in: child.stdin.take().unwrap(),
133 child_out: child.stdout.take().unwrap(),
134 }
135 }
136}
137
138impl Default for DaemonHandle {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144pub struct NixProxy<R, W> {
149 pub read: NixRead<R>,
150 pub write: NixWrite<W>,
151 proxy: DaemonHandle,
152}
153
154impl<R: Read, W: Write> NixProxy<R, W> {
155 pub fn new(r: R, w: W) -> Self {
156 Self {
157 read: NixRead { inner: r },
158 write: NixWrite { inner: w },
159 proxy: DaemonHandle::new(),
160 }
161 }
162}
163
164pub struct NixRead<R> {
166 pub inner: R,
167}
168
169pub struct NixWrite<W> {
171 pub inner: W,
172}
173
174#[derive(Clone, Debug, Serialize, Deserialize)]
176pub struct PathSet {
177 pub paths: Vec<Path>,
178}
179
180#[derive(Clone, Debug, Serialize, Deserialize)]
182pub struct StorePathSet {
183 pub paths: Vec<StorePath>,
185}
186
187#[derive(Clone, Debug, Serialize, Deserialize)]
189pub struct StringSet {
190 pub paths: Vec<NixString>,
191}
192
193#[derive(Clone, Debug, Serialize, Deserialize)]
195pub struct Realisation(pub NixString);
196
197#[derive(Clone, Debug, Serialize, Deserialize)]
199pub struct RealisationSet {
200 pub realisations: Vec<Realisation>,
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize)]
205pub struct NarHash {
206 pub data: ByteBuf,
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize)]
211pub struct ValidPathInfoWithPath {
212 pub path: StorePath,
213 pub info: ValidPathInfo,
214}
215
216impl<R: Read> NixRead<R> {
217 pub fn read_u64(&mut self) -> serialize::Result<u64> {
219 self.inner.read_nix()
220 }
221
222 pub fn read_string(&mut self) -> serialize::Result<NixString> {
224 self.inner.read_nix()
225 }
226
227 pub fn read_nix(&mut self) -> serialize::Result<()> {
229 self.inner.read_nix()
230 }
231}
232
233impl<W: Write> NixWrite<W> {
234 pub fn write_u64(&mut self, n: u64) -> serialize::Result<()> {
236 self.inner.write_nix(&n)
237 }
238
239 pub fn write_string(&mut self, s: &[u8]) -> serialize::Result<()> {
241 NixSerializer {
242 write: &mut self.inner,
243 }
244 .write_byte_buf(s)
245 }
246
247 pub fn write_nix(&mut self, data: &impl Serialize) -> serialize::Result<()> {
254 self.inner.write_nix(data)
255 }
256
257 pub fn flush(&mut self) -> Result<()> {
259 Ok(self.inner.flush()?)
260 }
261}
262
263impl<R: Read, W: Write> NixProxy<R, W> {
264 pub fn handshake(&mut self) -> Result<u64> {
269 let magic = self.read.read_u64()?;
270 if magic != WORKER_MAGIC_1 {
271 eprintln!("{magic:x}");
272 eprintln!("{WORKER_MAGIC_1:x}");
273 todo!("handle error: protocol mismatch 1");
274 }
275
276 self.write.write_u64(WORKER_MAGIC_2)?;
277 self.write.write_u64(PROTOCOL_VERSION.into())?;
278 self.write.flush()?;
279
280 let client_version = self.read.read_u64()?;
281
282 if client_version < PROTOCOL_VERSION.into() {
283 Err(anyhow!("Client version {client_version} is too old"))?;
284 }
285
286 let mut _op_count: u64 = 0;
288
289 let _obsolete_cpu_affinity = self.read.read_u64()?;
290 let _obsolete_reserve_space = self.read.read_u64()?;
291 self.write.write_string("rust-nix-bazel-0.1.0".as_bytes())?;
292 self.write.flush()?;
293 Ok(PROTOCOL_VERSION.into())
294 }
295
296 fn forward_stderr(&mut self) -> Result<()> {
297 loop {
298 let msg: stderr::Msg = self.proxy.child_out.read_nix()?;
299 self.write.inner.write_nix(&msg)?;
300 eprintln!("read stderr msg {msg:?}");
301 self.write.inner.flush()?;
302
303 if msg == stderr::Msg::Last(()) {
304 break;
305 }
306 }
307 Ok(())
308 }
309
310 pub fn next_op(&mut self) -> Result<Option<WorkerOp>> {
311 match self.read.inner.read_nix::<WorkerOp>() {
312 Err(serialize::Error::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
313 Ok(None)
314 }
315 Err(e) => Err(e.into()),
316 Ok(x) => Ok(Some(x)),
317 }
318 }
319
320 pub fn process_connection(&mut self) -> Result<()>
322 where
323 W: Send,
324 {
325 let client_version = self.handshake()?;
326
327 self.proxy.child_in.write_nix(&WORKER_MAGIC_1)?;
329 self.proxy.child_in.flush()?;
330 let magic: u64 = self.proxy.child_out.read_nix()?;
331 if magic != WORKER_MAGIC_2 {
332 Err(anyhow!("unexpected WORKER_MAGIC_2: got {magic:x}"))?;
333 }
334 let protocol_version: u64 = self.proxy.child_out.read_nix()?;
335 if protocol_version < PROTOCOL_VERSION.into() {
336 Err(anyhow!(
337 "unexpected protocol version: got {protocol_version}"
338 ))?;
339 }
340 self.proxy.child_in.write_nix(&client_version)?;
341 self.proxy.child_in.write_nix(&0u64)?; self.proxy.child_in.write_nix(&0u64)?; self.proxy.child_in.flush()?;
344 let proxy_daemon_version: NixString = self.proxy.child_out.read_nix()?;
345 eprintln!(
346 "Proxy daemon is: {}",
347 String::from_utf8_lossy(proxy_daemon_version.0.as_ref())
348 );
349 self.forward_stderr()?;
350
351 loop {
352 let op = match self.read.inner.read_nix::<WorkerOp>() {
353 Err(serialize::Error::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
354 eprintln!("EOF, closing");
355 break;
356 }
357 x => x,
358 }?;
359
360 eprintln!("read op {op:?}");
361 self.proxy.child_in.write_nix(&op).unwrap();
362 op.stream(&mut self.read.inner, &mut self.proxy.child_in)
363 .unwrap();
364 self.proxy.child_in.flush().unwrap();
365
366 self.forward_stderr()?;
367
368 op.proxy_response(&mut self.proxy.child_out, &mut self.write.inner)?;
370 self.write.inner.flush()?;
371 }
372 Ok(())
373 }
374}
375
376#[derive(Debug, Clone, Copy, PartialEq)]
377struct DaemonVersion {
378 major: u8,
379 minor: u8,
380}
381
382impl From<u64> for DaemonVersion {
383 fn from(x: u64) -> Self {
384 let major = ((x >> 8) & 0xff) as u8;
385 let minor = (x & 0xff) as u8;
386 Self { major, minor }
387 }
388}
389
390impl From<DaemonVersion> for u64 {
391 fn from(DaemonVersion { major, minor }: DaemonVersion) -> Self {
392 ((major as u64) << 8) | minor as u64
393 }
394}