nix_daemon/nix/
wire.rs

1// SPDX-FileCopyrightText: 2023 embr <git@liclac.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5//! Low-level helpers for the nix-daemon wire format.
6
7use crate::{
8    nix::Proto, BuildMode, BuildResult, BuildResultStatus, ClientSettings, Error, NixError,
9    PathInfo, Result, ResultExt, Stderr, StderrField, StderrResult, StderrStartActivity, Verbosity,
10};
11use async_stream::try_stream;
12use chrono::{DateTime, Utc};
13use futures::{future::OptionFuture, Future};
14use num_enum::{IntoPrimitive, TryFromPrimitive, TryFromPrimitiveError};
15use std::collections::HashMap;
16use std::fmt::Debug;
17use tap::{Tap, TapFallible};
18use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf};
19use tokio_stream::{Stream, StreamExt};
20use tracing::{instrument, trace};
21
22/// Magic number sent by the client.
23pub const WORKER_MAGIC_1: u64 = 0x6e697863;
24/// Magic number sent by the daemon.
25pub const WORKER_MAGIC_2: u64 = 0x6478696f;
26
27/// Opcodes.
28///
29/// Not included are Ops that were obsolete before the earliest Nix version we support:
30///
31/// - Nix 2.0 (2016-04-19: e0204f8d462041387651af388074491fd0bf36d6)
32///   - QueryPathHash = 4
33///   - QueryReferences = 5
34///   - QueryDeriver = 18
35/// - Nix 2.0 (2016-05-04: 538a64e8c314f23ba0c5d76201f1c20e71884a21)
36///   - ExportPath = 16
37///   - ImportPaths = 27
38#[derive(Debug, TryFromPrimitive, IntoPrimitive)]
39#[repr(u64)]
40pub enum Op {
41    IsValidPath = 1,
42    HasSubstitutes = 3,
43    QueryReferrers = 6,
44    AddToStore = 7,
45    BuildPaths = 9,
46    EnsurePath = 10,
47    AddTempRoot = 11,
48    AddIndirectRoot = 12,
49    SyncWithGC = 13,
50    FindRoots = 14,
51    SetOptions = 19,
52    CollectGarbage = 20, // TODO: Can't be safely cargo tested on host daemon+store.
53    QuerySubstitutablePathInfo = 21,
54    QueryAllValidPaths = 23,
55    QueryFailedPaths = 24,
56    ClearFailedPaths = 25,
57    QueryPathInfo = 26,
58    QueryPathFromHashPart = 29,
59    QuerySubstitutablePathInfos = 30,
60    QueryValidPaths = 31,
61    QuerySubstitutablePaths = 32,
62    QueryValidDerivers = 33,
63    OptimiseStore = 34, // TODO: Can't be safely cargo tested on host daemon+store.
64    VerifyStore = 35,   // TODO: Can't be safely cargo tested on host daemon+store.
65    BuildDerivation = 36,
66    AddSignatures = 37,
67    NarFromPath = 38,
68    AddToStoreNar = 39,
69    QueryMissing = 40,
70    QueryDerivationOutputMap = 41,
71    RegisterDrvOutput = 42,
72    QueryRealisation = 43,
73    AddMultipleToStore = 44,
74    AddBuildLog = 45,
75    BuildPathsWithResults = 46,
76
77    /// Obsolete since Nix 2.4, use AddToStore.
78    /// <https://github.com/NixOS/nix/commit/c602ebfb34de3626fa0b9110face6ea4b171ac0f>
79    AddTextToStore = 8,
80    /// Obsolete since Nix 2.4, use QueryDerivationOutputMap.
81    /// <https://github.com/NixOS/nix/commit/d38f860c3ef001a456d4d447f89219de5e3c830c>
82    QueryDerivationOutputs = 22,
83    /// Obsolete since Nix 2.4, get it from any derivation struct.
84    /// <https://github.com/NixOS/nix/commit/045b07200c77bf1fe19c0a986aafb531e7e1ba54>
85    QueryDerivationOutputNames = 28,
86}
87impl From<TryFromPrimitiveError<Op>> for Error {
88    fn from(value: TryFromPrimitiveError<Op>) -> Self {
89        Self::Invalid(format!("Op({:x})", value.number))
90    }
91}
92
93/// Reader compatible with CppNix' FramedSource/FramedSink protocol.
94///
95/// Each "frame" is a u64 length, followed by that number of bytes.
96/// The stream is terminated by a frame of length 0.
97#[derive(Debug)]
98pub struct FramedReader<'r, R: AsyncReadExt + Unpin + Debug> {
99    r: &'r mut R,
100    frame_len: usize,
101}
102
103impl<'r, R: AsyncReadExt + Unpin + Debug> FramedReader<'r, R> {
104    pub fn new(r: &'r mut R) -> Self {
105        Self { r, frame_len: 0 }
106    }
107
108    pub async fn read_chunked(&mut self, buf: &mut ReadBuf<'_>) -> std::io::Result<()> {
109        if self.frame_len == 0 {
110            self.frame_len = read_u64(self.r)
111                .await?
112                .try_into()
113                .expect("u64 chunk length doesn't fit into usize");
114            trace!(self.frame_len, "read frame header");
115        }
116        if self.frame_len > 0 {
117            let chunk_len = self
118                .r
119                .read(buf.initialize_unfilled_to(std::cmp::min(self.frame_len, buf.remaining())))
120                .await?;
121            buf.advance(chunk_len);
122            self.frame_len = self
123                .frame_len
124                .checked_sub(chunk_len)
125                .expect("read more than chunk_len, somehow");
126            trace!(chunk_len, remaining = self.frame_len, "read frame chunk");
127        }
128        Ok(())
129    }
130}
131
132impl<'r, R: AsyncReadExt + Unpin + Debug> AsyncRead for FramedReader<'r, R> {
133    fn poll_read(
134        mut self: std::pin::Pin<&mut Self>,
135        cx: &mut std::task::Context<'_>,
136        buf: &mut tokio::io::ReadBuf<'_>,
137    ) -> std::task::Poll<std::io::Result<()>> {
138        let read = self.read_chunked(buf);
139        tokio::pin!(read);
140        read.as_mut().as_mut().poll(cx)
141    }
142}
143
144#[instrument(skip_all, level = "trace")]
145pub async fn copy_to_framed<R: AsyncReadExt + Unpin, W: AsyncWriteExt + Unpin>(
146    r: &mut R,
147    w: &mut W,
148    buf: &mut [u8],
149) -> Result<()> {
150    loop {
151        let len = r.read(buf).await?;
152        write_u64(w, len as u64).await?;
153        if len == 0 {
154            trace!("Done");
155            return Ok(());
156        }
157        w.write_all(&buf[..len]).await?;
158        trace!(len, "Copied frame...");
159    }
160}
161
162/// Read a u64 from the stream (little endian).
163#[instrument(skip(r), level = "trace")]
164pub async fn read_u64<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<u64> {
165    Ok(r.read_u64_le().await.tap_ok(|v| trace!(v, "<-"))?)
166}
167/// Write a u64 from the stream (little endian).
168#[instrument(skip(w, v), level = "trace")]
169pub async fn write_u64<W: AsyncWriteExt + Unpin>(w: &mut W, v: u64) -> std::io::Result<()> {
170    Ok(w.write_u64_le(v.tap(|v| trace!(v, "->"))).await?)
171}
172
173/// Read a boolean from the stream, encoded as u64 (>0 is true).
174#[instrument(skip(r), level = "trace")]
175pub async fn read_bool<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<bool> {
176    Ok(read_u64(r)
177        .await
178        .map(|v| v > 0)
179        .tap_ok(|v| trace!(v, "<-"))?)
180}
181/// Write a boolean to the stream, encoded as u64 (>0 is true).
182#[instrument(skip(w, v), level = "trace")]
183pub async fn write_bool<W: AsyncWriteExt + Unpin>(w: &mut W, v: bool) -> std::io::Result<()> {
184    Ok(write_u64(w, if v { 1 } else { 0 }.tap(|v| trace!(v, "->"))).await?)
185}
186
187/// Read a DateTime (CppNix: time_t) from the stream, encoded as a unix timestamp.
188#[instrument(skip(r), level = "trace")]
189pub async fn read_datetime<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<DateTime<Utc>> {
190    read_u64(r).await.map_err(Into::into).and_then(|ts| {
191        DateTime::from_timestamp(ts as i64, 0)
192            .ok_or_else(|| Error::Invalid(ts.to_string()))
193            .tap_ok(|dt| trace!(?dt, "<-"))
194    })
195}
196/// Write a DateTime (CppNix: time_t) from the stream, encoded as a unix timestamp.
197#[instrument(skip(w), level = "trace")]
198pub async fn write_datetime<W: AsyncWriteExt + Unpin>(w: &mut W, dt: DateTime<Utc>) -> Result<()> {
199    Ok(write_u64(
200        w,
201        dt.timestamp()
202            .tap(|dt| trace!(?dt, "->"))
203            .try_into()
204            .map_err(|err| Error::Invalid(format!("DateTime({}): {}", dt.to_string(), err)))?,
205    )
206    .await?)
207}
208
209/// Read a protocol version from the stream.
210#[instrument(skip(r), level = "trace")]
211pub async fn read_proto<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Proto> {
212    Ok(read_u64(r)
213        .await
214        .map(|raw| raw.into())
215        .tap_ok(|v| trace!(?v, "<-"))?)
216}
217/// Write a protocol version to the stream.
218#[instrument(skip(w, v), level = "trace")]
219pub async fn write_proto<W: AsyncWriteExt + Unpin>(w: &mut W, v: Proto) -> Result<()> {
220    Ok(write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await?)
221}
222
223/// Read an opcode from the stream.
224#[instrument(skip(r), level = "trace")]
225pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Op> {
226    Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
227}
228/// Write an opcode to the stream.
229#[instrument(skip(w, v), level = "trace")]
230pub async fn write_op<W: AsyncWriteExt + Unpin>(w: &mut W, v: Op) -> Result<()> {
231    Ok(write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await?)
232}
233
234/// Read a verbosity level from the stream.
235#[instrument(skip(r), level = "trace")]
236pub async fn read_verbosity<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Verbosity> {
237    Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
238}
239/// Write a verbosity level to the stream.
240#[instrument(skip(w, v), level = "trace")]
241pub async fn write_verbosity<W: AsyncWriteExt + Unpin>(w: &mut W, v: Verbosity) -> Result<()> {
242    Ok(write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await?)
243}
244
245/// Read a build mode from the stream.
246#[instrument(skip(r), level = "trace")]
247pub async fn read_build_mode<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<BuildMode> {
248    Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
249}
250/// Write a build mode to the stream.
251#[instrument(skip(w, v), level = "trace")]
252pub async fn write_build_mode<W: AsyncWriteExt + Unpin>(
253    w: &mut W,
254    v: BuildMode,
255) -> std::io::Result<()> {
256    write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await
257}
258
259/// Read a build result status from the stream.
260#[instrument(skip(r), level = "trace")]
261pub async fn read_build_result_status<R: AsyncReadExt + Unpin>(
262    r: &mut R,
263) -> Result<BuildResultStatus> {
264    Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
265}
266/// Write a build result status to the stream.
267#[instrument(skip(w, v), level = "trace")]
268pub async fn write_build_result_status<W: AsyncWriteExt + Unpin>(
269    w: &mut W,
270    v: BuildResultStatus,
271) -> std::io::Result<()> {
272    write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await
273}
274
275/// Read a string from the stream. Strings are prefixed with a u64 length, but the
276/// data is padded to the next 8-byte boundary, eg. a 1-byte string becomes 16 bytes
277/// on the wire: 8 for the length, 1 for the data, then 7 bytes of discarded 0x00s.
278#[instrument(skip(r), level = "trace")]
279pub async fn read_string<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<String> {
280    let len = read_u64(r).await? as usize;
281    let padded_len = len + if len % 8 > 0 { 8 - (len % 8) } else { 0 };
282    if padded_len <= 1024 {
283        let mut buf = [0u8; 1024];
284        r.read_exact(&mut buf[..padded_len]).await?;
285        Ok(String::from_utf8_lossy(&buf[..len]).to_string())
286    } else {
287        let mut buf = vec![0u8; padded_len];
288        r.read_exact(&mut buf[..padded_len]).await?;
289        Ok(String::from_utf8_lossy(&buf[..len]).to_string())
290    }
291    .tap_ok(|v| trace!(v, "<-"))
292}
293
294/// Write a string to the stream. See: NixReader::read_string.
295#[instrument(skip(w, s), level = "trace")]
296pub async fn write_string<W: AsyncWriteExt + Unpin, S: AsRef<str> + Debug>(
297    w: &mut W,
298    s: S,
299) -> std::io::Result<()> {
300    trace!(v=?s,"->");
301    let truncated = s.as_ref().split(|b| b == '\0').next().ok_or_else(|| {
302        std::io::Error::new(
303            std::io::ErrorKind::UnexpectedEof,
304            Error::Invalid("slice::split() returned an empty iterator".to_string()),
305        )
306    })?;
307    let b = truncated.as_bytes();
308    write_u64(w, b.len().try_into().unwrap()).await?;
309    if b.len() > 0 {
310        w.write_all(b).await?;
311        trace!(v = truncated, "->");
312        if b.len() % 8 > 0 {
313            let pad_buf = [0u8; 7];
314            let pad_len = 8 - (b.len() % 8);
315            w.write_all(&pad_buf[..pad_len]).await?;
316            trace!(pad_len, "[ padding ]");
317        }
318    }
319    Ok(())
320}
321
322/// Read a list (or set) of strings from the stream - a u64 count, followed by that
323/// many strings using the normal `read_string()` encoding.
324#[instrument(skip(r), level = "trace")]
325pub fn read_strings<R: AsyncReadExt + Unpin>(r: &mut R) -> impl Stream<Item = Result<String>> + '_ {
326    try_stream! {
327        let count = read_u64(r).await.with_field("<count>")? as usize;
328        for _ in 0..count {
329            yield read_string(r).await?;
330        }
331    }
332}
333/// Write a list of strings to the stream.
334#[instrument(skip(w, si), level = "trace")]
335pub async fn write_strings<W: AsyncWriteExt + Unpin, I>(w: &mut W, si: I) -> std::io::Result<()>
336where
337    I: IntoIterator + Send,
338    I::IntoIter: ExactSizeIterator + Send,
339    I::Item: AsRef<str> + Send + Sync,
340{
341    let si = si.into_iter();
342    write_u64(w, si.len().try_into().unwrap()).await?;
343    for s in si {
344        write_string(w, s.as_ref()).await?;
345    }
346    Ok(())
347}
348
349/// Read a NixError struct from the stream.
350#[instrument(skip(r), level = "trace")]
351pub async fn read_error<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<NixError> {
352    match read_string(r).await {
353        Err(err) => Err(err.into()),
354        Ok(s) if s.as_str() == "Error" => Ok(()),
355        Ok(s) => Err(Error::Invalid(format!("expected 'Error', got '{}'", s))),
356    }
357    .with_field("Error.__unused_type_1")?;
358
359    let level = read_verbosity(r).await.with_field("Error.level")?;
360
361    match read_string(r).await {
362        Err(err) => Err(err.into()),
363        Ok(s) if s.as_str() == "Error" => Ok(()),
364        Ok(s) => Err(Error::Invalid(format!("expected 'Error', got '{}'", s))),
365    }
366    .with_field("Error.__unused_type_2")?;
367
368    let msg = read_string(r).await.with_field("Error.msg")?;
369
370    read_u64(r).await.with_field("Error.__unused_err_pos")?;
371
372    let num_traces = read_u64(r).await.with_field("Error.traces[].<count>")?;
373    let mut traces = Vec::with_capacity(num_traces.try_into().unwrap_or_default());
374    for _ in 0..num_traces {
375        read_u64(r)
376            .await
377            .with_field("Error.traces[].__unused_pos")?;
378        traces.push(read_string(r).await.with_field("Error.traces[].hint")?);
379    }
380
381    Ok(NixError { level, msg, traces })
382}
383
384/// Write a NixError struct to the stream.
385#[instrument(skip(w, v), level = "trace")]
386pub async fn write_error<W: AsyncWriteExt + Unpin>(w: &mut W, v: NixError) -> Result<()> {
387    write_string(w, "Error")
388        .await
389        .with_field("Error.__unused_type_1")?;
390
391    write_verbosity(w, v.level)
392        .await
393        .with_field("Error.level")?;
394
395    write_string(w, "Error")
396        .await
397        .with_field("Error.__unused_type_2")?;
398
399    write_string(w, v.msg).await.with_field("Error.msg")?;
400
401    write_u64(w, 0).await.with_field("Error.__unused_err_pos")?;
402
403    write_u64(w, v.traces.len() as u64)
404        .await
405        .with_field("Error.traces[].<count>")?;
406    for trace in v.traces.iter() {
407        write_u64(w, 0)
408            .await
409            .with_field("Error.traces[].__unused_pos")?;
410        write_string(w, trace)
411            .await
412            .with_field("Error.traces[].hint")?;
413    }
414
415    Ok(())
416}
417
418#[instrument(skip(r), level = "trace")]
419pub async fn read_build_result<R: AsyncReadExt + Unpin>(
420    r: &mut R,
421    proto: Proto,
422) -> Result<BuildResult> {
423    let status = read_build_result_status(r)
424        .await
425        .with_field("BuildResult.status")?;
426    let error_msg = read_string(r).await.with_field("BuildResult.error_msg")?;
427
428    let mut br = BuildResult {
429        status,
430        error_msg,
431        times_built: 0,
432        is_non_deterministic: false,
433        start_time: DateTime::default(),
434        stop_time: DateTime::default(),
435        built_outputs: HashMap::default(),
436    };
437
438    if proto >= Proto(1, 29) {
439        br.times_built = read_u64(r).await.with_field("BuildResult.times_built")?;
440        br.is_non_deterministic = read_bool(r)
441            .await
442            .with_field("BuildResult.is_non_deterministic")?;
443        br.start_time = read_datetime(r)
444            .await
445            .with_field("BuildResult.start_time")?;
446        br.stop_time = read_datetime(r).await.with_field("BuildResult.stop_time")?;
447    }
448    if proto >= Proto(1, 28) {
449        let count = read_u64(r)
450            .await
451            .with_field("BuildResult.built_outputs.<count>")? as usize;
452        for _ in 0..count {
453            let name = read_string(r)
454                .await
455                .with_field("BuildResult.built_outputs[].name")?;
456            let path = read_string(r)
457                .await
458                .with_field("BuildResult.built_outputs[].path")?;
459            br.built_outputs.insert(name, path);
460        }
461    }
462
463    Ok(br)
464}
465
466#[instrument(skip(w), level = "trace")]
467pub async fn write_build_result<W: AsyncWriteExt + Unpin>(
468    w: &mut W,
469    result: &BuildResult,
470    proto: Proto,
471) -> Result<()> {
472    write_build_result_status(w, result.status)
473        .await
474        .with_field("BuildResult.status")?;
475    write_string(w, &result.error_msg)
476        .await
477        .with_field("BuildResult.error_msg")?;
478
479    if proto >= Proto(1, 29) {
480        write_u64(w, result.times_built)
481            .await
482            .with_field("BuildResult.times_built")?;
483        write_bool(w, result.is_non_deterministic)
484            .await
485            .with_field("BuildResult.is_non_deterministic")?;
486        write_datetime(w, result.start_time)
487            .await
488            .with_field("BuildResult.start_time")?;
489        write_datetime(w, result.stop_time)
490            .await
491            .with_field("BuildResult.stop_time")?;
492    }
493    if proto >= Proto(1, 28) {
494        write_u64(w, result.built_outputs.len() as u64)
495            .await
496            .with_field("BuildResult.built_outputs.<count>")?;
497        for (name, path) in &result.built_outputs {
498            write_string(w, name)
499                .await
500                .with_field("BuildResult.built_outputs[].name")?;
501            write_string(w, path)
502                .await
503                .with_field("BuildResult.built_outputs[].path")?;
504        }
505    }
506
507    Ok(())
508}
509
510#[derive(Debug, TryFromPrimitive, IntoPrimitive)]
511#[repr(u64)]
512pub enum StderrKind {
513    Next = 0x6f6c6d67,
514    Last = 0x616c7473,
515    Error = 0x63787470,
516    StartActivity = 0x53545254,
517    StopActivity = 0x53544f50,
518    Result = 0x52534c54,
519}
520
521#[instrument(skip(r), level = "trace")]
522pub async fn read_stderr<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Option<Stderr>> {
523    let kind = StderrKind::try_from(read_u64(r).await?)
524        .map_err(|TryFromPrimitiveError { number }| {
525            Error::Invalid(format!("Stderr<{:#x}>", number))
526        })?
527        .tap(|kind| trace!(?kind, "<-"));
528
529    match kind {
530        StderrKind::Last => Ok(None),
531        StderrKind::Next => Ok(Some(Stderr::Next(read_string(r).await?))),
532        StderrKind::Error => Ok(Some(Stderr::Error(read_error(r).await?))),
533        StderrKind::StartActivity => Ok(Some(Stderr::StartActivity(
534            read_stderr_start_activity(r).await?,
535        ))),
536        StderrKind::StopActivity => Ok(Some(Stderr::StopActivity {
537            act_id: read_u64(r).await?,
538        })),
539        StderrKind::Result => Ok(Some(Stderr::Result(read_stderr_result(r).await?))),
540    }
541    .tap_ok(|stderr| trace!(?stderr, "<-"))
542}
543#[instrument(skip(r), level = "trace")]
544pub async fn read_stderr_start_activity<R: AsyncReadExt + Unpin>(
545    r: &mut R,
546) -> Result<StderrStartActivity> {
547    Ok(StderrStartActivity {
548        act_id: read_u64(r).await?,
549        level: read_verbosity(r).await?,
550        kind: read_u64(r).await?.try_into()?,
551        s: read_string(r).await?,
552        fields: read_stderr_fields(r).await?,
553        parent_id: read_u64(r).await?,
554    }
555    .tap(|act| trace!(?act, "<-")))
556}
557#[instrument(skip(r), level = "trace")]
558pub async fn read_stderr_result<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<StderrResult> {
559    Ok(StderrResult {
560        act_id: read_u64(r).await?,
561        kind: read_u64(r).await?.try_into()?,
562        fields: read_stderr_fields(r).await?,
563    }
564    .tap(|res| trace!(?res, "<-")))
565}
566#[instrument(skip(r), level = "trace")]
567pub async fn read_stderr_fields<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Vec<StderrField>> {
568    let count = read_u64(r)
569        .await
570        .with_field("StartActivity.fields.<count>")?
571        .tap(|count| trace!(count, "fields[].<count>")) as usize;
572    let mut fields = Vec::with_capacity(count);
573    for n in 0..count {
574        fields.push(
575            match read_u64(r)
576                .await
577                .with_field("StartActivity.fields[].<type>")?
578            {
579                0 => Ok(StderrField::Int(read_u64(r).await?)),
580                1 => Ok(StderrField::String(read_string(r).await?)),
581                v => Err(Error::Invalid(format!("<type>({})", v))),
582            }
583            .with_field("StartActivity.fields[]")?
584            .tap(|v| trace!(n, count, ?v, "fields[]")),
585        )
586    }
587    Ok(fields)
588}
589
590#[instrument(skip(w), level = "trace")]
591pub async fn write_stderr<W: AsyncWriteExt + Unpin>(w: &mut W, v: Option<Stderr>) -> Result<()> {
592    trace!(?v, "->");
593    match v {
594        None => write_u64(w, StderrKind::Last.into()).await?,
595        Some(Stderr::Next(s)) => {
596            write_u64(w, StderrKind::Next.into()).await?;
597            write_string(w, s).await?;
598        }
599        Some(Stderr::Error(err)) => {
600            write_u64(w, StderrKind::Error.into()).await?;
601            write_error(w, err).await?;
602        }
603        Some(Stderr::StartActivity(start)) => {
604            write_u64(w, StderrKind::StartActivity.into()).await?;
605            write_stderr_start_activity(w, start).await?;
606        }
607        Some(Stderr::StopActivity { act_id }) => {
608            write_u64(w, StderrKind::StopActivity.into()).await?;
609            write_u64(w, act_id).await?;
610        }
611        Some(Stderr::Result(res)) => {
612            write_u64(w, StderrKind::Result.into()).await?;
613            write_stderr_result(w, res).await?;
614        }
615    }
616    Ok(())
617}
618#[instrument(skip(w, v), level = "trace")]
619pub async fn write_stderr_start_activity<W: AsyncWriteExt + Unpin>(
620    w: &mut W,
621    v: StderrStartActivity,
622) -> Result<()> {
623    trace!(?v, "->");
624    write_u64(w, v.act_id).await?;
625    write_verbosity(w, v.level).await?;
626    write_u64(w, v.kind.into()).await?;
627    write_string(w, v.s).await?;
628    write_stderr_fields(w, v.fields).await?;
629    write_u64(w, v.parent_id).await?;
630    Ok(())
631}
632#[instrument(skip(w, v), level = "trace")]
633pub async fn write_stderr_result<W: AsyncWriteExt + Unpin>(
634    w: &mut W,
635    v: StderrResult,
636) -> Result<()> {
637    trace!(?v, "->");
638    write_u64(w, v.act_id).await?;
639    write_u64(w, v.kind.into()).await?;
640    write_stderr_fields(w, v.fields).await?;
641    Ok(())
642}
643#[instrument(skip(w, vs), level = "trace")]
644pub async fn write_stderr_fields<W: AsyncWriteExt + Unpin, I>(w: &mut W, vs: I) -> Result<()>
645where
646    I: IntoIterator + Send,
647    I::IntoIter: ExactSizeIterator<Item = StderrField> + Send,
648{
649    let vs = vs.into_iter();
650    write_u64(w, vs.len() as u64)
651        .await
652        .with_field("StartActivity.fields.<count>")?;
653    for field in vs {
654        match field {
655            StderrField::Int(v) => {
656                write_u64(w, 0)
657                    .await
658                    .with_field("StartActivity.fields[].<type>")?;
659                write_u64(w, v).await.with_field("StartActivity.fields[]")?;
660            }
661            StderrField::String(v) => {
662                write_u64(w, 0)
663                    .await
664                    .with_field("StartActivity.fields[].<type>")?;
665                write_string(w, v)
666                    .await
667                    .with_field("StartActivity.fields[]")?;
668            }
669        }
670    }
671    Ok(())
672}
673
674/// Read a ClientSettings structure from the stream.
675#[instrument(skip(r), level = "trace")]
676pub async fn read_client_settings<R: AsyncReadExt + Unpin>(
677    r: &mut R,
678    proto: Proto,
679) -> Result<ClientSettings> {
680    let keep_failed = read_bool(r)
681        .await
682        .with_field("ClientSettings.keep_failed")?;
683    let keep_going = read_bool(r).await.with_field("ClientSettings.keep_going")?;
684    let try_fallback = read_bool(r)
685        .await
686        .with_field("ClientSettings.try_fallback")?;
687    let verbosity = read_verbosity(r)
688        .await
689        .with_field("ClientSettings.verbosity")?;
690    let max_build_jobs = read_u64(r)
691        .await
692        .with_field("ClientSettings.max_build_jobs")?;
693    let max_silent_time = read_u64(r)
694        .await
695        .with_field("ClientSettings.max_silent_time")?;
696    read_u64(r)
697        .await
698        .with_field("ClientSettings.__obsolete_use_build_hook")?;
699    let verbose_build = read_verbosity(r)
700        .await
701        .map(|v| v == Verbosity::Error)
702        .with_field("ClientSettings.verbose_build")?;
703    read_u64(r)
704        .await
705        .with_field("ClientSettings.__obsolete_log_type")?;
706    read_u64(r)
707        .await
708        .with_field("ClientSettings.__obsolete_print_build_trace")?;
709    let build_cores = read_u64(r).await.with_field("ClientSettings.build_cores")?;
710    let use_substitutes = read_bool(r)
711        .await
712        .with_field("ClientSettings.use_substitutes")?;
713
714    let overrides = if proto >= Proto(1, 12) {
715        let count = read_u64(r)
716            .await
717            .with_field("ClientSettings.overrides.<count>")? as usize;
718        let mut overrides = HashMap::with_capacity(count as usize);
719        for _ in 0..count {
720            let key = read_string(r)
721                .await
722                .with_field("ClientSettings.overrides[].key")?;
723            let value = read_string(r)
724                .await
725                .with_field("ClientSettings.overrides[].value")?;
726            overrides.insert(key, value);
727        }
728        overrides
729    } else {
730        HashMap::with_capacity(0)
731    };
732
733    Ok(ClientSettings {
734        keep_failed,
735        keep_going,
736        try_fallback,
737        verbosity,
738        max_build_jobs,
739        max_silent_time,
740        verbose_build,
741        build_cores,
742        use_substitutes,
743        overrides,
744    })
745}
746/// Writes a ClientSettings structure to the stream.
747#[instrument(skip(w, cs), level = "trace")]
748pub async fn write_client_settings<W: AsyncWriteExt + Unpin>(
749    w: &mut W,
750    proto: Proto,
751    cs: &ClientSettings,
752) -> Result<()> {
753    write_bool(w, cs.keep_failed)
754        .await
755        .with_field("ClientSettings.keep_failed")?;
756    write_bool(w, cs.keep_going)
757        .await
758        .with_field("ClientSettings.keep_going")?;
759    write_bool(w, cs.try_fallback)
760        .await
761        .with_field("ClientSettings.try_fallback")?;
762
763    write_verbosity(w, cs.verbosity)
764        .await
765        .with_field("ClientSettings.verbosity")?;
766    write_u64(w, cs.max_build_jobs)
767        .await
768        .with_field("ClientSettings.max_build_jobs")?;
769    write_u64(w, cs.max_silent_time)
770        .await
771        .with_field("ClientSettings.max_silent_time")?;
772    write_u64(w, 0)
773        .await
774        .with_field("ClientSettings.__obsolete_use_build_hook")?;
775    write_verbosity(
776        w,
777        if cs.verbose_build {
778            Verbosity::Error
779        } else {
780            Verbosity::Vomit
781        },
782    )
783    .await
784    .with_field("ClientSettings.verbose_build")?;
785    write_u64(w, 0)
786        .await
787        .with_field("ClientSettings.__obsolete_log_type")?;
788    write_u64(w, 0)
789        .await
790        .with_field("ClientSettings.__obsolete_print_build_trace")?;
791    write_u64(w, cs.build_cores)
792        .await
793        .with_field("ClientSettings.build_cores")?;
794    write_bool(w, cs.use_substitutes)
795        .await
796        .with_field("ClientSettings.use_substitutes")?;
797
798    if proto >= Proto(1, 12) {
799        write_u64(w, cs.overrides.len() as u64)
800            .await
801            .with_field("ClientSettings.overrides.<count>")?;
802        for (key, value) in cs.overrides.iter() {
803            write_string(w, key)
804                .await
805                .with_field("ClientSettings.overrides[].key")?;
806            write_string(w, value)
807                .await
808                .with_field("ClientSettings.overrides[].value")?;
809        }
810    }
811
812    Ok(())
813}
814
815/// Read a PathInfo structure from the stream.
816#[instrument(skip(r), level = "trace")]
817pub async fn read_pathinfo<R: AsyncReadExt + Unpin>(r: &mut R, proto: Proto) -> Result<PathInfo> {
818    let deriver = read_string(r)
819        .await
820        .map(|s| (!s.is_empty()).then_some(s)) // "" -> None.
821        .with_field("PathInfo.deriver")?;
822    let nar_hash = read_string(r).await.with_field("PathInfo.nar_hash")?;
823    let references = read_strings(r)
824        .collect::<Result<Vec<_>>>()
825        .await
826        .with_field("PathInfo.deriver")?;
827    let registration_time = read_datetime(r)
828        .await
829        .with_field("PathInfo.registration_time")?;
830    let nar_size = read_u64(r).await.with_field("PathInfo.nar_size")?;
831
832    let ultimate = OptionFuture::from(proto.since(16).then(|| read_bool(r)))
833        .await
834        .transpose()
835        .with_field("PathInfo.ultimate")?
836        .unwrap_or_default();
837    let signatures = OptionFuture::from(proto.since(16).then(|| read_strings(r).collect()))
838        .await
839        .transpose()
840        .with_field("PathInfo.signatures")?
841        .unwrap_or_default();
842    let ca = OptionFuture::from(proto.since(16).then(|| read_string(r)))
843        .await
844        .transpose()
845        .with_field("PathInfo.ca")?
846        .and_then(|s| (!s.is_empty()).then_some(s)); // "" -> None.
847
848    Ok(PathInfo {
849        deriver,
850        nar_hash,
851        references,
852        registration_time,
853        nar_size,
854        ultimate,
855        signatures,
856        ca,
857    })
858}
859/// Write a PathInfo structure to the stream.
860#[instrument(skip(w, pi), level = "trace")]
861pub async fn write_pathinfo<W: AsyncWriteExt + Unpin>(
862    w: &mut W,
863    proto: Proto,
864    pi: &PathInfo,
865) -> Result<()> {
866    write_string(w, pi.deriver.as_ref().map(|s| s.as_str()).unwrap_or(""))
867        .await
868        .with_field("PathInfo.deriver")?;
869    write_string(w, pi.nar_hash.as_str())
870        .await
871        .with_field("PathInfo.nar_hash")?;
872    write_strings(w, &pi.references)
873        .await
874        .with_field("PathInfo.deriver")?;
875    write_u64(w, pi.registration_time.timestamp().try_into().unwrap())
876        .await
877        .with_field("PathInfo.registration_time")?;
878    write_u64(w, pi.nar_size)
879        .await
880        .with_field("PathInfo.nar_size")?;
881
882    if proto.since(16) {
883        write_bool(w, pi.ultimate)
884            .await
885            .with_field("PathInfo.ultimate")?;
886        write_strings(w, &pi.signatures)
887            .await
888            .with_field("PathInfo.signatures")?;
889        write_string(w, &pi.ca.as_ref().map(|s| s.as_str()).unwrap_or(""))
890            .await
891            .with_field("PathInfo.ca")?;
892    }
893    Ok(())
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use chrono::{TimeZone, Utc};
900    use tokio_stream::StreamExt;
901    use tokio_test::io::Builder;
902
903    fn pad_str<const L: usize>(s: &str) -> [u8; L] {
904        assert!(L % 8 == 0, "{} is not aligned to 8", L);
905        let mut v = [0u8; L];
906        (&mut v[..s.len()]).copy_from_slice(s.as_bytes());
907        v
908    }
909
910    #[tokio::test]
911    async fn test_copy_to_framed_empty() {
912        let mut r = Builder::new().read(&[]).build();
913        let mut w = Builder::new().write(&0u64.to_le_bytes()).build();
914        let mut buf = [0u8; 64];
915        copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
916    }
917
918    #[tokio::test]
919    async fn test_copy_to_framed_1() {
920        let mut r = Builder::new().read(&[1, 2, 3, 4]).build();
921        let mut w = Builder::new()
922            .write(&4u64.to_le_bytes())
923            .write(&[1, 2, 3, 4])
924            .write(&0u64.to_le_bytes())
925            .build();
926        let mut buf = [0u8; 64];
927        copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
928    }
929
930    #[tokio::test]
931    async fn test_copy_to_framed_2reads() {
932        // 10 bytes split across 2 reads.
933        let mut r = Builder::new()
934            .read(&[1, 2, 3, 4])
935            .read(&[5, 6, 7, 8, 9, 10])
936            .build();
937        let mut w = Builder::new()
938            .write(&4u64.to_le_bytes())
939            .write(&[1, 2, 3, 4])
940            .write(&6u64.to_le_bytes())
941            .write(&[5, 6, 7, 8, 9, 10])
942            .write(&0u64.to_le_bytes())
943            .build();
944        let mut buf = [0u8; 64];
945        copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
946    }
947
948    #[tokio::test]
949    async fn test_copy_to_framed_3buffers() {
950        // 5 bytes available, but buffer is only 2 bytes.
951        let mut r = Builder::new().read(&[1, 2, 3, 4, 5]).build();
952        let mut w = Builder::new()
953            .write(&2u64.to_le_bytes())
954            .write(&[1, 2])
955            .write(&2u64.to_le_bytes())
956            .write(&[3, 4])
957            .write(&1u64.to_le_bytes())
958            .write(&[5])
959            .write(&0u64.to_le_bytes())
960            .build();
961        let mut buf = [0u8; 2];
962        copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
963    }
964
965    // Integers.
966    #[tokio::test]
967    async fn test_read_u64() {
968        let mut mock = Builder::new().read(&1234567890u64.to_le_bytes()).build();
969        assert_eq!(1234567890u64, read_u64(&mut mock).await.unwrap());
970    }
971    #[tokio::test]
972    async fn test_write_u64() {
973        let mut mock = Builder::new().write(&1234567890u64.to_le_bytes()).build();
974        write_u64(&mut mock, 1234567890).await.unwrap();
975    }
976
977    // Booleans.
978    #[tokio::test]
979    async fn test_read_bool_0() {
980        let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
981        assert_eq!(false, read_bool(&mut mock).await.unwrap());
982    }
983    #[tokio::test]
984    async fn test_read_bool_1() {
985        let mut mock = Builder::new().read(&1u64.to_le_bytes()).build();
986        assert_eq!(true, read_bool(&mut mock).await.unwrap());
987    }
988    #[tokio::test]
989    async fn test_read_bool_2() {
990        let mut mock = Builder::new().read(&2u64.to_le_bytes()).build();
991        assert_eq!(true, read_bool(&mut mock).await.unwrap());
992    }
993
994    #[tokio::test]
995    async fn test_write_bool_false() {
996        let mut mock = Builder::new().write(&0u64.to_le_bytes()).build();
997        write_bool(&mut mock, false).await.unwrap();
998    }
999    #[tokio::test]
1000    async fn test_write_bool_true() {
1001        let mut mock = Builder::new().write(&1u64.to_le_bytes()).build();
1002        write_bool(&mut mock, true).await.unwrap();
1003    }
1004
1005    // Protocol versions.
1006    #[tokio::test]
1007    async fn test_read_proto() {
1008        // Why are they this way around?? Is this right?
1009        let mut mock = Builder::new().read(&[34, 12, 0, 0, 0, 0, 0, 0]).build();
1010        assert_eq!(Proto(12, 34), read_proto(&mut mock).await.unwrap());
1011    }
1012    #[tokio::test]
1013    async fn test_write_proto() {
1014        let mut mock = Builder::new().write(&[34, 12, 0, 0, 0, 0, 0, 0]).build();
1015        write_proto(&mut mock, Proto(12, 34)).await.unwrap();
1016    }
1017
1018    // Verbosity.
1019    #[tokio::test]
1020    async fn test_read_verbosity() {
1021        let mut m = Builder::new()
1022            .read(&0u64.to_le_bytes()) // Error
1023            .read(&1u64.to_le_bytes()) // Warn
1024            .read(&2u64.to_le_bytes()) // Notice
1025            .read(&3u64.to_le_bytes()) // Info
1026            .read(&4u64.to_le_bytes()) // Talkative
1027            .read(&5u64.to_le_bytes()) // Chatty
1028            .read(&6u64.to_le_bytes()) // Debug
1029            .read(&7u64.to_le_bytes()) // Vomit
1030            .build();
1031        assert_eq!(Verbosity::Error, read_verbosity(&mut m).await.unwrap());
1032        assert_eq!(Verbosity::Warn, read_verbosity(&mut m).await.unwrap());
1033        assert_eq!(Verbosity::Notice, read_verbosity(&mut m).await.unwrap());
1034        assert_eq!(Verbosity::Info, read_verbosity(&mut m).await.unwrap());
1035        assert_eq!(Verbosity::Talkative, read_verbosity(&mut m).await.unwrap());
1036        assert_eq!(Verbosity::Chatty, read_verbosity(&mut m).await.unwrap());
1037        assert_eq!(Verbosity::Debug, read_verbosity(&mut m).await.unwrap());
1038        assert_eq!(Verbosity::Vomit, read_verbosity(&mut m).await.unwrap());
1039    }
1040    #[tokio::test]
1041    async fn test_write_verbosity() {
1042        let mut m = Builder::new()
1043            .write(&0u64.to_le_bytes()) // Error
1044            .write(&1u64.to_le_bytes()) // Warn
1045            .write(&2u64.to_le_bytes()) // Notice
1046            .write(&3u64.to_le_bytes()) // Info
1047            .write(&4u64.to_le_bytes()) // Talkative
1048            .write(&5u64.to_le_bytes()) // Chatty
1049            .write(&6u64.to_le_bytes()) // Debug
1050            .write(&7u64.to_le_bytes()) // Vomit
1051            .build();
1052        write_verbosity(&mut m, Verbosity::Error).await.unwrap();
1053        write_verbosity(&mut m, Verbosity::Warn).await.unwrap();
1054        write_verbosity(&mut m, Verbosity::Notice).await.unwrap();
1055        write_verbosity(&mut m, Verbosity::Info).await.unwrap();
1056        write_verbosity(&mut m, Verbosity::Talkative).await.unwrap();
1057        write_verbosity(&mut m, Verbosity::Chatty).await.unwrap();
1058        write_verbosity(&mut m, Verbosity::Debug).await.unwrap();
1059        write_verbosity(&mut m, Verbosity::Vomit).await.unwrap();
1060    }
1061
1062    // Short strings.
1063    #[tokio::test]
1064    async fn test_read_string_len_0() {
1065        let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
1066        assert_eq!("".to_string(), read_string(&mut mock).await.unwrap());
1067    }
1068    #[tokio::test]
1069    async fn test_read_string_len_1() {
1070        let mut mock = Builder::new()
1071            .read(&1u64.to_le_bytes())
1072            .read("a".as_bytes())
1073            .read(&[0u8; 7])
1074            .build();
1075        assert_eq!("a".to_string(), read_string(&mut mock).await.unwrap());
1076    }
1077    #[tokio::test]
1078    async fn test_read_string_len_8() {
1079        let mut mock = Builder::new()
1080            .read(&8u64.to_le_bytes())
1081            .read("i'm gay.".as_bytes())
1082            .build();
1083        assert_eq!(
1084            "i'm gay.".to_string(),
1085            read_string(&mut mock).await.unwrap()
1086        );
1087    }
1088
1089    #[tokio::test]
1090    async fn test_write_string_len_0() {
1091        let mut mock = Builder::new().write(&0u64.to_le_bytes()).build();
1092        write_string(&mut mock, "").await.unwrap();
1093    }
1094    #[tokio::test]
1095    async fn test_write_string_len_1() {
1096        let mut mock = Builder::new()
1097            .write(&1u64.to_le_bytes())
1098            .write("a\0\0\0\0\0\0\0".as_bytes())
1099            .build();
1100        write_string(&mut mock, "a").await.unwrap();
1101    }
1102    #[tokio::test]
1103    async fn test_write_string_len_8() {
1104        let mut mock = Builder::new()
1105            .write(&8u64.to_le_bytes())
1106            .write("i'm gay.".as_bytes())
1107            .build();
1108        write_string(&mut mock, "i'm gay.").await.unwrap();
1109    }
1110
1111    // Long strings (infinite screaming).
1112    #[tokio::test]
1113    async fn test_read_string_len_1024() {
1114        let mut mock = Builder::new()
1115            .read(&1024u64.to_le_bytes())
1116            .read(&['a' as u8; 1024])
1117            .build();
1118        assert_eq!(
1119            String::from_iter(std::iter::repeat('a').take(1024)),
1120            read_string(&mut mock).await.unwrap()
1121        );
1122    }
1123    #[tokio::test]
1124    async fn test_read_string_len_1025() {
1125        let mut mock = Builder::new()
1126            .read(&1025u64.to_le_bytes())
1127            .read(&['a' as u8; 1025])
1128            .read(&[0u8; 7])
1129            .build();
1130        assert_eq!(
1131            String::from_iter(std::iter::repeat('a').take(1025)),
1132            read_string(&mut mock).await.unwrap()
1133        );
1134    }
1135    #[tokio::test]
1136    async fn test_read_string_len_2048() {
1137        let mut mock = Builder::new()
1138            .read(&2048u64.to_le_bytes())
1139            .read(&['a' as u8; 2048])
1140            .build();
1141        assert_eq!(
1142            String::from_iter(std::iter::repeat('a').take(2048)),
1143            read_string(&mut mock).await.unwrap()
1144        );
1145    }
1146
1147    #[tokio::test]
1148    async fn test_read_strings_0() {
1149        let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
1150        assert_eq!(
1151            Vec::<String>::new(),
1152            read_strings(&mut mock)
1153                .collect::<Result<Vec<_>>>()
1154                .await
1155                .unwrap()
1156        );
1157    }
1158    #[tokio::test]
1159    async fn test_read_strings_1() {
1160        let mut mock = Builder::new()
1161            .read(&1u64.to_le_bytes())
1162            .read(&8u64.to_le_bytes())
1163            .read("i'm gay.".as_bytes())
1164            .build();
1165        assert_eq!(
1166            vec!["i'm gay.".to_string()],
1167            read_strings(&mut mock)
1168                .collect::<Result<Vec<_>>>()
1169                .await
1170                .unwrap()
1171        );
1172    }
1173    #[tokio::test]
1174    async fn test_read_strings_4() {
1175        let mut mock = Builder::new()
1176            .read(&4u64.to_le_bytes())
1177            .read(&22u64.to_le_bytes())
1178            .read("according to all known\0\0".as_bytes())
1179            .read(&16u64.to_le_bytes())
1180            .read("laws of aviation".as_bytes())
1181            .read(&25u64.to_le_bytes())
1182            .read("there's no way that a bee\0\0\0\0\0\0\0".as_bytes())
1183            .read(&21u64.to_le_bytes())
1184            .read("should be able to fly\0\0\0".as_bytes())
1185            .build();
1186        assert_eq!(
1187            vec![
1188                "according to all known".to_string(),
1189                "laws of aviation".to_string(),
1190                "there's no way that a bee".to_string(),
1191                "should be able to fly".to_string()
1192            ],
1193            read_strings(&mut mock)
1194                .collect::<Result<Vec<_>>>()
1195                .await
1196                .unwrap()
1197        );
1198    }
1199
1200    #[tokio::test]
1201    async fn test_read_pathinfo_derived() {
1202        let mut mock = Builder::new()
1203            .read(&61u64.to_le_bytes()) // deriver
1204            .read(&pad_str::<64>(
1205                "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv",
1206            ))
1207            .read(&51u64.to_le_bytes()) // nar_hash
1208            .read(&pad_str::<56>(
1209                "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=",
1210            ))
1211            .read(&2u64.to_le_bytes()) // references[]
1212            .read(&52u64.to_le_bytes()) // references[0]
1213            .read(&pad_str::<56>(
1214                "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3",
1215            ))
1216             .read(&57u64.to_le_bytes()) // references[1]
1217             .read(&pad_str::<64>(
1218                 "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27",
1219             ))
1220             .read(&1700495600u64.to_le_bytes()) // registration_time
1221             .read(&1768960u64.to_le_bytes()) // nar_size
1222             .read(&0u64.to_le_bytes()) // ultimate
1223             .read(&1u64.to_le_bytes()) // signatures[]
1224             .read(&106u64.to_le_bytes()) // signatures[0]
1225             .read(&pad_str::<112>(
1226                 "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==",
1227             ))
1228             .read(&0u64.to_le_bytes()) // ca
1229            .build();
1230        assert_eq!(
1231            PathInfo {
1232                deriver: Some(
1233                    "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv".into()
1234                ),
1235                nar_hash: "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=".into(),
1236                references: vec![
1237                    "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3".into(),
1238                    "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27".into(),
1239                ],
1240                registration_time: Utc.with_ymd_and_hms(2023, 11, 20, 15, 53, 20).unwrap(),
1241                nar_size: 1768960,
1242                ultimate: false,
1243                signatures: vec![
1244                    "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==".into(),
1245                ],
1246                ca: None,
1247            },
1248            read_pathinfo(&mut mock, Proto(1, 35)).await.unwrap()
1249        );
1250    }
1251    #[tokio::test]
1252    async fn test_read_pathinfo_ca() {
1253        let mut mock = Builder::new()
1254            .read(&0u64.to_le_bytes()) // deriver
1255            .read(&51u64.to_le_bytes()) // nar_hash
1256            .read(&pad_str::<56>(
1257                "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=",
1258            ))
1259            .read(&5u64.to_le_bytes()) // references[]
1260            .read(&60u64.to_le_bytes()) // references[0]
1261            .read(&pad_str::<64>(
1262                "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv",
1263            ))
1264            .read(&58u64.to_le_bytes()) // references[1]
1265            .read(&pad_str::<64>(
1266                "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv",
1267            ))
1268            .read(&54u64.to_le_bytes()) // references[2]
1269            .read(&pad_str::<56>(
1270                "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh",
1271            ))
1272            .read(&60u64.to_le_bytes()) // references[3]
1273            .read(&pad_str::<64>(
1274                "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv",
1275            ))
1276            .read(&60u64.to_le_bytes()) // references[4]
1277            .read(&pad_str::<64>(
1278                "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv",
1279            ))
1280            .read(&1700854586u64.to_le_bytes()) // registration_time
1281            .read(&3008u64.to_le_bytes()) // nar_size
1282            .read(&0u64.to_le_bytes()) // ultimate
1283            .read(&0u64.to_le_bytes()) // signatures[]
1284            .read(&64u64.to_le_bytes()) // ca
1285            .read(&pad_str::<64>(
1286                "text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d",
1287            ))
1288            .build();
1289        assert_eq!(
1290            PathInfo {
1291                deriver: None,
1292                nar_hash: "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=".into(),
1293                references: vec![
1294                    "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv".into(),
1295                    "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv".into(),
1296                    "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh".into(),
1297                    "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv".into(),
1298                    "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv".into(),
1299                ],
1300                registration_time: Utc.with_ymd_and_hms(2023, 11, 24, 19, 36, 26).unwrap(),
1301                nar_size: 3008,
1302                ultimate: false,
1303                signatures: vec![],
1304                ca: Some("text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d".into()),
1305            },
1306            read_pathinfo(&mut mock, Proto(1, 35)).await.unwrap()
1307        );
1308    }
1309
1310    #[tokio::test]
1311    async fn test_write_pathinfo_derived() {
1312        let mut mock = Builder::new()
1313            .write(&61u64.to_le_bytes()) // deriver
1314            .write(&pad_str::<64>(
1315                "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv",
1316            ))
1317            .write(&51u64.to_le_bytes()) // nar_hash
1318            .write(&pad_str::<56>(
1319                "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=",
1320            ))
1321            .write(&2u64.to_le_bytes()) // references[]
1322            .write(&52u64.to_le_bytes()) // references[0]
1323            .write(&pad_str::<56>(
1324                "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3",
1325            ))
1326            .write(&57u64.to_le_bytes()) // references[1]
1327            .write(&pad_str::<64>(
1328                "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27",
1329            ))
1330            .write(&1700495600u64.to_le_bytes()) // registration_time
1331            .write(&1768960u64.to_le_bytes()) // nar_size
1332            .write(&0u64.to_le_bytes()) // ultimate
1333            .write(&1u64.to_le_bytes()) // signatures[]
1334            .write(&106u64.to_le_bytes()) // signatures[0]
1335            .write(&pad_str::<112>(
1336                 "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==",
1337             ))
1338            .write(&0u64.to_le_bytes()) // ca
1339            .build();
1340        write_pathinfo(
1341            &mut mock,
1342            Proto(1, 35),
1343            &PathInfo {
1344                deriver: Some(
1345                    "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv".into(),
1346                ),
1347                nar_hash: "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=".into(),
1348                references: vec![
1349                    "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3".into(),
1350                    "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27".into(),
1351                ],
1352                registration_time: Utc.with_ymd_and_hms(2023, 11, 20, 15, 53, 20).unwrap(),
1353                nar_size: 1768960,
1354                ultimate: false,
1355                signatures: vec![
1356                   "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==".into(),
1357                ],
1358                ca: None,
1359            },
1360        )
1361        .await
1362        .unwrap();
1363    }
1364    #[tokio::test]
1365    async fn test_write_pathinfo_ca() {
1366        let mut mock = Builder::new()
1367            .write(&0u64.to_le_bytes()) // deriver
1368            .write(&51u64.to_le_bytes()) // nar_hash
1369            .write(&pad_str::<56>(
1370                "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=",
1371            ))
1372            .write(&5u64.to_le_bytes()) // references[]
1373            .write(&60u64.to_le_bytes()) // references[0]
1374            .write(&pad_str::<64>(
1375                "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv",
1376            ))
1377            .write(&58u64.to_le_bytes()) // references[1]
1378            .write(&pad_str::<64>(
1379                "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv",
1380            ))
1381            .write(&54u64.to_le_bytes()) // references[2]
1382            .write(&pad_str::<56>(
1383                "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh",
1384            ))
1385            .write(&60u64.to_le_bytes()) // references[3]
1386            .write(&pad_str::<64>(
1387                "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv",
1388            ))
1389            .write(&60u64.to_le_bytes()) // references[4]
1390            .write(&pad_str::<64>(
1391                "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv",
1392            ))
1393            .write(&1700854586u64.to_le_bytes()) // registration_time
1394            .write(&3008u64.to_le_bytes()) // nar_size
1395            .write(&0u64.to_le_bytes()) // ultimate
1396            .write(&0u64.to_le_bytes()) // signatures[]
1397            .write(&64u64.to_le_bytes()) // ca
1398            .write(&pad_str::<64>(
1399                "text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d",
1400            ))
1401            .build();
1402        write_pathinfo(
1403            &mut mock,
1404            Proto(1, 35),
1405            &PathInfo {
1406                deriver: None,
1407                nar_hash: "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=".into(),
1408                references: vec![
1409                    "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv".into(),
1410                    "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv".into(),
1411                    "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh".into(),
1412                    "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv".into(),
1413                    "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv".into(),
1414                ],
1415                registration_time: Utc.with_ymd_and_hms(2023, 11, 24, 19, 36, 26).unwrap(),
1416                nar_size: 3008,
1417                ultimate: false,
1418                signatures: vec![],
1419                ca: Some("text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d".into()),
1420            },
1421        )
1422        .await
1423        .unwrap();
1424    }
1425
1426    // This test case was adapted from cppnix (LGPL license) commit
1427    // 91b6833686a6a6d9eac7f3f66393ec89ef1d3b57
1428    #[tokio::test]
1429    #[allow(non_snake_case)]
1430    async fn test_cppnix__src_libstore_worker_protocol__string() {
1431        let mut mock = Builder::new()
1432            .write(&[
1433                // cppnix ./tests/unit/libstore/data/common-protocol/string.bin
1434                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00,
1435                0x00, 0x00, 0x68, 0x69, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00,
1436                0x00, 0x00, 0x00, 0x00, 0x77, 0x68, 0x69, 0x74, 0x65, 0x20, 0x72, 0x61, 0x62, 0x62,
1437                0x69, 0x74, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1438                0xe5, 0xa4, 0xa7, 0xe7, 0x99, 0xbd, 0xe5, 0x85, 0x94, 0x00, 0x00, 0x00, 0x00, 0x00,
1439                0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x6f, 0x68, 0x20, 0x6e,
1440                0x6f, 0x20, 0x00, 0x00,
1441            ])
1442            .build();
1443
1444        // cppnix tests/unit/libstore/worker-protocol.cc
1445        write_string(&mut mock, "").await.unwrap();
1446        write_string(&mut mock, "hi").await.unwrap();
1447        write_string(&mut mock, "white rabbit").await.unwrap();
1448        write_string(&mut mock, "大白兔").await.unwrap();
1449        write_string(&mut mock, "oh no \0\0\0 what was that!")
1450            .await
1451            .unwrap();
1452    }
1453
1454    #[tokio::test]
1455    async fn test_framedreader_empty() {
1456        let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
1457        let mut buf = Vec::new();
1458        let len = FramedReader::new(&mut mock)
1459            .read_to_end(&mut buf)
1460            .await
1461            .unwrap();
1462        assert_eq!(0, len);
1463        assert_eq!(0, buf.len());
1464    }
1465
1466    #[tokio::test]
1467    async fn test_framedreader_1f() {
1468        let mut mock = Builder::new()
1469            .read(&2u64.to_le_bytes())
1470            .read(&[1, 2])
1471            .read(&0u64.to_le_bytes())
1472            .build();
1473        let mut buf = Vec::new();
1474        let len = FramedReader::new(&mut mock)
1475            .read_to_end(&mut buf)
1476            .await
1477            .unwrap();
1478        assert_eq!(&[1, 2], &buf[..]);
1479        assert_eq!(2, len);
1480    }
1481
1482    #[tokio::test]
1483    async fn test_framedreader_2f() {
1484        let mut mock = Builder::new()
1485            .read(&2u64.to_le_bytes())
1486            .read(&[1, 2])
1487            .read(&4u64.to_le_bytes())
1488            .read(&[3, 4, 5, 6])
1489            .read(&0u64.to_le_bytes())
1490            .build();
1491        let mut buf = Vec::new();
1492        let len = FramedReader::new(&mut mock)
1493            .read_to_end(&mut buf)
1494            .await
1495            .unwrap();
1496        assert_eq!(&[0x01, 0x02, 0x03, 0x04, 0x05, 0x06], &buf[..]);
1497        assert_eq!(6, len);
1498    }
1499
1500    #[tokio::test]
1501    async fn test_framedreader_2f_overflow() {
1502        let mut mock = Builder::new()
1503            .read(&2u64.to_le_bytes())
1504            .read(&[1, 2])
1505            .read(&4u64.to_le_bytes())
1506            .read(&[3, 4, 5, 6])
1507            .read(&0u64.to_le_bytes())
1508            .build();
1509        let mut buf = [0u8; 2];
1510        let mut r = FramedReader::new(&mut mock);
1511        assert_eq!(2, r.read(&mut buf).await.unwrap());
1512        assert_eq!(&[1, 2], &buf[..]);
1513        assert_eq!(2, r.read(&mut buf).await.unwrap());
1514        assert_eq!(&[3, 4], &buf[..]);
1515        assert_eq!(2, r.read(&mut buf).await.unwrap());
1516        assert_eq!(&[5, 6], &buf[..]);
1517        assert_eq!(0, r.read(&mut buf).await.unwrap());
1518    }
1519}