1use crate::{
8 nix::Proto, BuildMode, BuildResult, BuildResultStatus, ClientSettings, Error, NixError,
9 PathInfo, Result, ResultExt, Stderr, StderrField, StderrResult, StderrStartActivity, Verbosity,
10};
11use async_stream::try_stream;
12use chrono::{DateTime, Utc};
13use futures::{future::OptionFuture, Future};
14use num_enum::{IntoPrimitive, TryFromPrimitive, TryFromPrimitiveError};
15use std::collections::HashMap;
16use std::fmt::Debug;
17use tap::{Tap, TapFallible};
18use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf};
19use tokio_stream::{Stream, StreamExt};
20use tracing::{instrument, trace};
21
22pub const WORKER_MAGIC_1: u64 = 0x6e697863;
24pub const WORKER_MAGIC_2: u64 = 0x6478696f;
26
27#[derive(Debug, TryFromPrimitive, IntoPrimitive)]
39#[repr(u64)]
40pub enum Op {
41 IsValidPath = 1,
42 HasSubstitutes = 3,
43 QueryReferrers = 6,
44 AddToStore = 7,
45 BuildPaths = 9,
46 EnsurePath = 10,
47 AddTempRoot = 11,
48 AddIndirectRoot = 12,
49 SyncWithGC = 13,
50 FindRoots = 14,
51 SetOptions = 19,
52 CollectGarbage = 20, QuerySubstitutablePathInfo = 21,
54 QueryAllValidPaths = 23,
55 QueryFailedPaths = 24,
56 ClearFailedPaths = 25,
57 QueryPathInfo = 26,
58 QueryPathFromHashPart = 29,
59 QuerySubstitutablePathInfos = 30,
60 QueryValidPaths = 31,
61 QuerySubstitutablePaths = 32,
62 QueryValidDerivers = 33,
63 OptimiseStore = 34, VerifyStore = 35, BuildDerivation = 36,
66 AddSignatures = 37,
67 NarFromPath = 38,
68 AddToStoreNar = 39,
69 QueryMissing = 40,
70 QueryDerivationOutputMap = 41,
71 RegisterDrvOutput = 42,
72 QueryRealisation = 43,
73 AddMultipleToStore = 44,
74 AddBuildLog = 45,
75 BuildPathsWithResults = 46,
76
77 AddTextToStore = 8,
80 QueryDerivationOutputs = 22,
83 QueryDerivationOutputNames = 28,
86}
87impl From<TryFromPrimitiveError<Op>> for Error {
88 fn from(value: TryFromPrimitiveError<Op>) -> Self {
89 Self::Invalid(format!("Op({:x})", value.number))
90 }
91}
92
93#[derive(Debug)]
98pub struct FramedReader<'r, R: AsyncReadExt + Unpin + Debug> {
99 r: &'r mut R,
100 frame_len: usize,
101}
102
103impl<'r, R: AsyncReadExt + Unpin + Debug> FramedReader<'r, R> {
104 pub fn new(r: &'r mut R) -> Self {
105 Self { r, frame_len: 0 }
106 }
107
108 pub async fn read_chunked(&mut self, buf: &mut ReadBuf<'_>) -> std::io::Result<()> {
109 if self.frame_len == 0 {
110 self.frame_len = read_u64(self.r)
111 .await?
112 .try_into()
113 .expect("u64 chunk length doesn't fit into usize");
114 trace!(self.frame_len, "read frame header");
115 }
116 if self.frame_len > 0 {
117 let chunk_len = self
118 .r
119 .read(buf.initialize_unfilled_to(std::cmp::min(self.frame_len, buf.remaining())))
120 .await?;
121 buf.advance(chunk_len);
122 self.frame_len = self
123 .frame_len
124 .checked_sub(chunk_len)
125 .expect("read more than chunk_len, somehow");
126 trace!(chunk_len, remaining = self.frame_len, "read frame chunk");
127 }
128 Ok(())
129 }
130}
131
132impl<'r, R: AsyncReadExt + Unpin + Debug> AsyncRead for FramedReader<'r, R> {
133 fn poll_read(
134 mut self: std::pin::Pin<&mut Self>,
135 cx: &mut std::task::Context<'_>,
136 buf: &mut tokio::io::ReadBuf<'_>,
137 ) -> std::task::Poll<std::io::Result<()>> {
138 let read = self.read_chunked(buf);
139 tokio::pin!(read);
140 read.as_mut().as_mut().poll(cx)
141 }
142}
143
144#[instrument(skip_all, level = "trace")]
145pub async fn copy_to_framed<R: AsyncReadExt + Unpin, W: AsyncWriteExt + Unpin>(
146 r: &mut R,
147 w: &mut W,
148 buf: &mut [u8],
149) -> Result<()> {
150 loop {
151 let len = r.read(buf).await?;
152 write_u64(w, len as u64).await?;
153 if len == 0 {
154 trace!("Done");
155 return Ok(());
156 }
157 w.write_all(&buf[..len]).await?;
158 trace!(len, "Copied frame...");
159 }
160}
161
162#[instrument(skip(r), level = "trace")]
164pub async fn read_u64<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<u64> {
165 Ok(r.read_u64_le().await.tap_ok(|v| trace!(v, "<-"))?)
166}
167#[instrument(skip(w, v), level = "trace")]
169pub async fn write_u64<W: AsyncWriteExt + Unpin>(w: &mut W, v: u64) -> std::io::Result<()> {
170 Ok(w.write_u64_le(v.tap(|v| trace!(v, "->"))).await?)
171}
172
173#[instrument(skip(r), level = "trace")]
175pub async fn read_bool<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<bool> {
176 Ok(read_u64(r)
177 .await
178 .map(|v| v > 0)
179 .tap_ok(|v| trace!(v, "<-"))?)
180}
181#[instrument(skip(w, v), level = "trace")]
183pub async fn write_bool<W: AsyncWriteExt + Unpin>(w: &mut W, v: bool) -> std::io::Result<()> {
184 Ok(write_u64(w, if v { 1 } else { 0 }.tap(|v| trace!(v, "->"))).await?)
185}
186
187#[instrument(skip(r), level = "trace")]
189pub async fn read_datetime<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<DateTime<Utc>> {
190 read_u64(r).await.map_err(Into::into).and_then(|ts| {
191 DateTime::from_timestamp(ts as i64, 0)
192 .ok_or_else(|| Error::Invalid(ts.to_string()))
193 .tap_ok(|dt| trace!(?dt, "<-"))
194 })
195}
196#[instrument(skip(w), level = "trace")]
198pub async fn write_datetime<W: AsyncWriteExt + Unpin>(w: &mut W, dt: DateTime<Utc>) -> Result<()> {
199 Ok(write_u64(
200 w,
201 dt.timestamp()
202 .tap(|dt| trace!(?dt, "->"))
203 .try_into()
204 .map_err(|err| Error::Invalid(format!("DateTime({}): {}", dt.to_string(), err)))?,
205 )
206 .await?)
207}
208
209#[instrument(skip(r), level = "trace")]
211pub async fn read_proto<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Proto> {
212 Ok(read_u64(r)
213 .await
214 .map(|raw| raw.into())
215 .tap_ok(|v| trace!(?v, "<-"))?)
216}
217#[instrument(skip(w, v), level = "trace")]
219pub async fn write_proto<W: AsyncWriteExt + Unpin>(w: &mut W, v: Proto) -> Result<()> {
220 Ok(write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await?)
221}
222
223#[instrument(skip(r), level = "trace")]
225pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Op> {
226 Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
227}
228#[instrument(skip(w, v), level = "trace")]
230pub async fn write_op<W: AsyncWriteExt + Unpin>(w: &mut W, v: Op) -> Result<()> {
231 Ok(write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await?)
232}
233
234#[instrument(skip(r), level = "trace")]
236pub async fn read_verbosity<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Verbosity> {
237 Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
238}
239#[instrument(skip(w, v), level = "trace")]
241pub async fn write_verbosity<W: AsyncWriteExt + Unpin>(w: &mut W, v: Verbosity) -> Result<()> {
242 Ok(write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await?)
243}
244
245#[instrument(skip(r), level = "trace")]
247pub async fn read_build_mode<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<BuildMode> {
248 Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
249}
250#[instrument(skip(w, v), level = "trace")]
252pub async fn write_build_mode<W: AsyncWriteExt + Unpin>(
253 w: &mut W,
254 v: BuildMode,
255) -> std::io::Result<()> {
256 write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await
257}
258
259#[instrument(skip(r), level = "trace")]
261pub async fn read_build_result_status<R: AsyncReadExt + Unpin>(
262 r: &mut R,
263) -> Result<BuildResultStatus> {
264 Ok(read_u64(r).await?.try_into().tap_ok(|v| trace!(?v, "<-"))?)
265}
266#[instrument(skip(w, v), level = "trace")]
268pub async fn write_build_result_status<W: AsyncWriteExt + Unpin>(
269 w: &mut W,
270 v: BuildResultStatus,
271) -> std::io::Result<()> {
272 write_u64(w, v.tap(|v| trace!(?v, "->")).into()).await
273}
274
275#[instrument(skip(r), level = "trace")]
279pub async fn read_string<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<String> {
280 let len = read_u64(r).await? as usize;
281 let padded_len = len + if len % 8 > 0 { 8 - (len % 8) } else { 0 };
282 if padded_len <= 1024 {
283 let mut buf = [0u8; 1024];
284 r.read_exact(&mut buf[..padded_len]).await?;
285 Ok(String::from_utf8_lossy(&buf[..len]).to_string())
286 } else {
287 let mut buf = vec![0u8; padded_len];
288 r.read_exact(&mut buf[..padded_len]).await?;
289 Ok(String::from_utf8_lossy(&buf[..len]).to_string())
290 }
291 .tap_ok(|v| trace!(v, "<-"))
292}
293
294#[instrument(skip(w, s), level = "trace")]
296pub async fn write_string<W: AsyncWriteExt + Unpin, S: AsRef<str> + Debug>(
297 w: &mut W,
298 s: S,
299) -> std::io::Result<()> {
300 trace!(v=?s,"->");
301 let truncated = s.as_ref().split(|b| b == '\0').next().ok_or_else(|| {
302 std::io::Error::new(
303 std::io::ErrorKind::UnexpectedEof,
304 Error::Invalid("slice::split() returned an empty iterator".to_string()),
305 )
306 })?;
307 let b = truncated.as_bytes();
308 write_u64(w, b.len().try_into().unwrap()).await?;
309 if b.len() > 0 {
310 w.write_all(b).await?;
311 trace!(v = truncated, "->");
312 if b.len() % 8 > 0 {
313 let pad_buf = [0u8; 7];
314 let pad_len = 8 - (b.len() % 8);
315 w.write_all(&pad_buf[..pad_len]).await?;
316 trace!(pad_len, "[ padding ]");
317 }
318 }
319 Ok(())
320}
321
322#[instrument(skip(r), level = "trace")]
325pub fn read_strings<R: AsyncReadExt + Unpin>(r: &mut R) -> impl Stream<Item = Result<String>> + '_ {
326 try_stream! {
327 let count = read_u64(r).await.with_field("<count>")? as usize;
328 for _ in 0..count {
329 yield read_string(r).await?;
330 }
331 }
332}
333#[instrument(skip(w, si), level = "trace")]
335pub async fn write_strings<W: AsyncWriteExt + Unpin, I>(w: &mut W, si: I) -> std::io::Result<()>
336where
337 I: IntoIterator + Send,
338 I::IntoIter: ExactSizeIterator + Send,
339 I::Item: AsRef<str> + Send + Sync,
340{
341 let si = si.into_iter();
342 write_u64(w, si.len().try_into().unwrap()).await?;
343 for s in si {
344 write_string(w, s.as_ref()).await?;
345 }
346 Ok(())
347}
348
349#[instrument(skip(r), level = "trace")]
351pub async fn read_error<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<NixError> {
352 match read_string(r).await {
353 Err(err) => Err(err.into()),
354 Ok(s) if s.as_str() == "Error" => Ok(()),
355 Ok(s) => Err(Error::Invalid(format!("expected 'Error', got '{}'", s))),
356 }
357 .with_field("Error.__unused_type_1")?;
358
359 let level = read_verbosity(r).await.with_field("Error.level")?;
360
361 match read_string(r).await {
362 Err(err) => Err(err.into()),
363 Ok(s) if s.as_str() == "Error" => Ok(()),
364 Ok(s) => Err(Error::Invalid(format!("expected 'Error', got '{}'", s))),
365 }
366 .with_field("Error.__unused_type_2")?;
367
368 let msg = read_string(r).await.with_field("Error.msg")?;
369
370 read_u64(r).await.with_field("Error.__unused_err_pos")?;
371
372 let num_traces = read_u64(r).await.with_field("Error.traces[].<count>")?;
373 let mut traces = Vec::with_capacity(num_traces.try_into().unwrap_or_default());
374 for _ in 0..num_traces {
375 read_u64(r)
376 .await
377 .with_field("Error.traces[].__unused_pos")?;
378 traces.push(read_string(r).await.with_field("Error.traces[].hint")?);
379 }
380
381 Ok(NixError { level, msg, traces })
382}
383
384#[instrument(skip(w, v), level = "trace")]
386pub async fn write_error<W: AsyncWriteExt + Unpin>(w: &mut W, v: NixError) -> Result<()> {
387 write_string(w, "Error")
388 .await
389 .with_field("Error.__unused_type_1")?;
390
391 write_verbosity(w, v.level)
392 .await
393 .with_field("Error.level")?;
394
395 write_string(w, "Error")
396 .await
397 .with_field("Error.__unused_type_2")?;
398
399 write_string(w, v.msg).await.with_field("Error.msg")?;
400
401 write_u64(w, 0).await.with_field("Error.__unused_err_pos")?;
402
403 write_u64(w, v.traces.len() as u64)
404 .await
405 .with_field("Error.traces[].<count>")?;
406 for trace in v.traces.iter() {
407 write_u64(w, 0)
408 .await
409 .with_field("Error.traces[].__unused_pos")?;
410 write_string(w, trace)
411 .await
412 .with_field("Error.traces[].hint")?;
413 }
414
415 Ok(())
416}
417
418#[instrument(skip(r), level = "trace")]
419pub async fn read_build_result<R: AsyncReadExt + Unpin>(
420 r: &mut R,
421 proto: Proto,
422) -> Result<BuildResult> {
423 let status = read_build_result_status(r)
424 .await
425 .with_field("BuildResult.status")?;
426 let error_msg = read_string(r).await.with_field("BuildResult.error_msg")?;
427
428 let mut br = BuildResult {
429 status,
430 error_msg,
431 times_built: 0,
432 is_non_deterministic: false,
433 start_time: DateTime::default(),
434 stop_time: DateTime::default(),
435 built_outputs: HashMap::default(),
436 };
437
438 if proto >= Proto(1, 29) {
439 br.times_built = read_u64(r).await.with_field("BuildResult.times_built")?;
440 br.is_non_deterministic = read_bool(r)
441 .await
442 .with_field("BuildResult.is_non_deterministic")?;
443 br.start_time = read_datetime(r)
444 .await
445 .with_field("BuildResult.start_time")?;
446 br.stop_time = read_datetime(r).await.with_field("BuildResult.stop_time")?;
447 }
448 if proto >= Proto(1, 28) {
449 let count = read_u64(r)
450 .await
451 .with_field("BuildResult.built_outputs.<count>")? as usize;
452 for _ in 0..count {
453 let name = read_string(r)
454 .await
455 .with_field("BuildResult.built_outputs[].name")?;
456 let path = read_string(r)
457 .await
458 .with_field("BuildResult.built_outputs[].path")?;
459 br.built_outputs.insert(name, path);
460 }
461 }
462
463 Ok(br)
464}
465
466#[instrument(skip(w), level = "trace")]
467pub async fn write_build_result<W: AsyncWriteExt + Unpin>(
468 w: &mut W,
469 result: &BuildResult,
470 proto: Proto,
471) -> Result<()> {
472 write_build_result_status(w, result.status)
473 .await
474 .with_field("BuildResult.status")?;
475 write_string(w, &result.error_msg)
476 .await
477 .with_field("BuildResult.error_msg")?;
478
479 if proto >= Proto(1, 29) {
480 write_u64(w, result.times_built)
481 .await
482 .with_field("BuildResult.times_built")?;
483 write_bool(w, result.is_non_deterministic)
484 .await
485 .with_field("BuildResult.is_non_deterministic")?;
486 write_datetime(w, result.start_time)
487 .await
488 .with_field("BuildResult.start_time")?;
489 write_datetime(w, result.stop_time)
490 .await
491 .with_field("BuildResult.stop_time")?;
492 }
493 if proto >= Proto(1, 28) {
494 write_u64(w, result.built_outputs.len() as u64)
495 .await
496 .with_field("BuildResult.built_outputs.<count>")?;
497 for (name, path) in &result.built_outputs {
498 write_string(w, name)
499 .await
500 .with_field("BuildResult.built_outputs[].name")?;
501 write_string(w, path)
502 .await
503 .with_field("BuildResult.built_outputs[].path")?;
504 }
505 }
506
507 Ok(())
508}
509
510#[derive(Debug, TryFromPrimitive, IntoPrimitive)]
511#[repr(u64)]
512pub enum StderrKind {
513 Next = 0x6f6c6d67,
514 Last = 0x616c7473,
515 Error = 0x63787470,
516 StartActivity = 0x53545254,
517 StopActivity = 0x53544f50,
518 Result = 0x52534c54,
519}
520
521#[instrument(skip(r), level = "trace")]
522pub async fn read_stderr<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Option<Stderr>> {
523 let kind = StderrKind::try_from(read_u64(r).await?)
524 .map_err(|TryFromPrimitiveError { number }| {
525 Error::Invalid(format!("Stderr<{:#x}>", number))
526 })?
527 .tap(|kind| trace!(?kind, "<-"));
528
529 match kind {
530 StderrKind::Last => Ok(None),
531 StderrKind::Next => Ok(Some(Stderr::Next(read_string(r).await?))),
532 StderrKind::Error => Ok(Some(Stderr::Error(read_error(r).await?))),
533 StderrKind::StartActivity => Ok(Some(Stderr::StartActivity(
534 read_stderr_start_activity(r).await?,
535 ))),
536 StderrKind::StopActivity => Ok(Some(Stderr::StopActivity {
537 act_id: read_u64(r).await?,
538 })),
539 StderrKind::Result => Ok(Some(Stderr::Result(read_stderr_result(r).await?))),
540 }
541 .tap_ok(|stderr| trace!(?stderr, "<-"))
542}
543#[instrument(skip(r), level = "trace")]
544pub async fn read_stderr_start_activity<R: AsyncReadExt + Unpin>(
545 r: &mut R,
546) -> Result<StderrStartActivity> {
547 Ok(StderrStartActivity {
548 act_id: read_u64(r).await?,
549 level: read_verbosity(r).await?,
550 kind: read_u64(r).await?.try_into()?,
551 s: read_string(r).await?,
552 fields: read_stderr_fields(r).await?,
553 parent_id: read_u64(r).await?,
554 }
555 .tap(|act| trace!(?act, "<-")))
556}
557#[instrument(skip(r), level = "trace")]
558pub async fn read_stderr_result<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<StderrResult> {
559 Ok(StderrResult {
560 act_id: read_u64(r).await?,
561 kind: read_u64(r).await?.try_into()?,
562 fields: read_stderr_fields(r).await?,
563 }
564 .tap(|res| trace!(?res, "<-")))
565}
566#[instrument(skip(r), level = "trace")]
567pub async fn read_stderr_fields<R: AsyncReadExt + Unpin>(r: &mut R) -> Result<Vec<StderrField>> {
568 let count = read_u64(r)
569 .await
570 .with_field("StartActivity.fields.<count>")?
571 .tap(|count| trace!(count, "fields[].<count>")) as usize;
572 let mut fields = Vec::with_capacity(count);
573 for n in 0..count {
574 fields.push(
575 match read_u64(r)
576 .await
577 .with_field("StartActivity.fields[].<type>")?
578 {
579 0 => Ok(StderrField::Int(read_u64(r).await?)),
580 1 => Ok(StderrField::String(read_string(r).await?)),
581 v => Err(Error::Invalid(format!("<type>({})", v))),
582 }
583 .with_field("StartActivity.fields[]")?
584 .tap(|v| trace!(n, count, ?v, "fields[]")),
585 )
586 }
587 Ok(fields)
588}
589
590#[instrument(skip(w), level = "trace")]
591pub async fn write_stderr<W: AsyncWriteExt + Unpin>(w: &mut W, v: Option<Stderr>) -> Result<()> {
592 trace!(?v, "->");
593 match v {
594 None => write_u64(w, StderrKind::Last.into()).await?,
595 Some(Stderr::Next(s)) => {
596 write_u64(w, StderrKind::Next.into()).await?;
597 write_string(w, s).await?;
598 }
599 Some(Stderr::Error(err)) => {
600 write_u64(w, StderrKind::Error.into()).await?;
601 write_error(w, err).await?;
602 }
603 Some(Stderr::StartActivity(start)) => {
604 write_u64(w, StderrKind::StartActivity.into()).await?;
605 write_stderr_start_activity(w, start).await?;
606 }
607 Some(Stderr::StopActivity { act_id }) => {
608 write_u64(w, StderrKind::StopActivity.into()).await?;
609 write_u64(w, act_id).await?;
610 }
611 Some(Stderr::Result(res)) => {
612 write_u64(w, StderrKind::Result.into()).await?;
613 write_stderr_result(w, res).await?;
614 }
615 }
616 Ok(())
617}
618#[instrument(skip(w, v), level = "trace")]
619pub async fn write_stderr_start_activity<W: AsyncWriteExt + Unpin>(
620 w: &mut W,
621 v: StderrStartActivity,
622) -> Result<()> {
623 trace!(?v, "->");
624 write_u64(w, v.act_id).await?;
625 write_verbosity(w, v.level).await?;
626 write_u64(w, v.kind.into()).await?;
627 write_string(w, v.s).await?;
628 write_stderr_fields(w, v.fields).await?;
629 write_u64(w, v.parent_id).await?;
630 Ok(())
631}
632#[instrument(skip(w, v), level = "trace")]
633pub async fn write_stderr_result<W: AsyncWriteExt + Unpin>(
634 w: &mut W,
635 v: StderrResult,
636) -> Result<()> {
637 trace!(?v, "->");
638 write_u64(w, v.act_id).await?;
639 write_u64(w, v.kind.into()).await?;
640 write_stderr_fields(w, v.fields).await?;
641 Ok(())
642}
643#[instrument(skip(w, vs), level = "trace")]
644pub async fn write_stderr_fields<W: AsyncWriteExt + Unpin, I>(w: &mut W, vs: I) -> Result<()>
645where
646 I: IntoIterator + Send,
647 I::IntoIter: ExactSizeIterator<Item = StderrField> + Send,
648{
649 let vs = vs.into_iter();
650 write_u64(w, vs.len() as u64)
651 .await
652 .with_field("StartActivity.fields.<count>")?;
653 for field in vs {
654 match field {
655 StderrField::Int(v) => {
656 write_u64(w, 0)
657 .await
658 .with_field("StartActivity.fields[].<type>")?;
659 write_u64(w, v).await.with_field("StartActivity.fields[]")?;
660 }
661 StderrField::String(v) => {
662 write_u64(w, 0)
663 .await
664 .with_field("StartActivity.fields[].<type>")?;
665 write_string(w, v)
666 .await
667 .with_field("StartActivity.fields[]")?;
668 }
669 }
670 }
671 Ok(())
672}
673
674#[instrument(skip(r), level = "trace")]
676pub async fn read_client_settings<R: AsyncReadExt + Unpin>(
677 r: &mut R,
678 proto: Proto,
679) -> Result<ClientSettings> {
680 let keep_failed = read_bool(r)
681 .await
682 .with_field("ClientSettings.keep_failed")?;
683 let keep_going = read_bool(r).await.with_field("ClientSettings.keep_going")?;
684 let try_fallback = read_bool(r)
685 .await
686 .with_field("ClientSettings.try_fallback")?;
687 let verbosity = read_verbosity(r)
688 .await
689 .with_field("ClientSettings.verbosity")?;
690 let max_build_jobs = read_u64(r)
691 .await
692 .with_field("ClientSettings.max_build_jobs")?;
693 let max_silent_time = read_u64(r)
694 .await
695 .with_field("ClientSettings.max_silent_time")?;
696 read_u64(r)
697 .await
698 .with_field("ClientSettings.__obsolete_use_build_hook")?;
699 let verbose_build = read_verbosity(r)
700 .await
701 .map(|v| v == Verbosity::Error)
702 .with_field("ClientSettings.verbose_build")?;
703 read_u64(r)
704 .await
705 .with_field("ClientSettings.__obsolete_log_type")?;
706 read_u64(r)
707 .await
708 .with_field("ClientSettings.__obsolete_print_build_trace")?;
709 let build_cores = read_u64(r).await.with_field("ClientSettings.build_cores")?;
710 let use_substitutes = read_bool(r)
711 .await
712 .with_field("ClientSettings.use_substitutes")?;
713
714 let overrides = if proto >= Proto(1, 12) {
715 let count = read_u64(r)
716 .await
717 .with_field("ClientSettings.overrides.<count>")? as usize;
718 let mut overrides = HashMap::with_capacity(count as usize);
719 for _ in 0..count {
720 let key = read_string(r)
721 .await
722 .with_field("ClientSettings.overrides[].key")?;
723 let value = read_string(r)
724 .await
725 .with_field("ClientSettings.overrides[].value")?;
726 overrides.insert(key, value);
727 }
728 overrides
729 } else {
730 HashMap::with_capacity(0)
731 };
732
733 Ok(ClientSettings {
734 keep_failed,
735 keep_going,
736 try_fallback,
737 verbosity,
738 max_build_jobs,
739 max_silent_time,
740 verbose_build,
741 build_cores,
742 use_substitutes,
743 overrides,
744 })
745}
746#[instrument(skip(w, cs), level = "trace")]
748pub async fn write_client_settings<W: AsyncWriteExt + Unpin>(
749 w: &mut W,
750 proto: Proto,
751 cs: &ClientSettings,
752) -> Result<()> {
753 write_bool(w, cs.keep_failed)
754 .await
755 .with_field("ClientSettings.keep_failed")?;
756 write_bool(w, cs.keep_going)
757 .await
758 .with_field("ClientSettings.keep_going")?;
759 write_bool(w, cs.try_fallback)
760 .await
761 .with_field("ClientSettings.try_fallback")?;
762
763 write_verbosity(w, cs.verbosity)
764 .await
765 .with_field("ClientSettings.verbosity")?;
766 write_u64(w, cs.max_build_jobs)
767 .await
768 .with_field("ClientSettings.max_build_jobs")?;
769 write_u64(w, cs.max_silent_time)
770 .await
771 .with_field("ClientSettings.max_silent_time")?;
772 write_u64(w, 0)
773 .await
774 .with_field("ClientSettings.__obsolete_use_build_hook")?;
775 write_verbosity(
776 w,
777 if cs.verbose_build {
778 Verbosity::Error
779 } else {
780 Verbosity::Vomit
781 },
782 )
783 .await
784 .with_field("ClientSettings.verbose_build")?;
785 write_u64(w, 0)
786 .await
787 .with_field("ClientSettings.__obsolete_log_type")?;
788 write_u64(w, 0)
789 .await
790 .with_field("ClientSettings.__obsolete_print_build_trace")?;
791 write_u64(w, cs.build_cores)
792 .await
793 .with_field("ClientSettings.build_cores")?;
794 write_bool(w, cs.use_substitutes)
795 .await
796 .with_field("ClientSettings.use_substitutes")?;
797
798 if proto >= Proto(1, 12) {
799 write_u64(w, cs.overrides.len() as u64)
800 .await
801 .with_field("ClientSettings.overrides.<count>")?;
802 for (key, value) in cs.overrides.iter() {
803 write_string(w, key)
804 .await
805 .with_field("ClientSettings.overrides[].key")?;
806 write_string(w, value)
807 .await
808 .with_field("ClientSettings.overrides[].value")?;
809 }
810 }
811
812 Ok(())
813}
814
815#[instrument(skip(r), level = "trace")]
817pub async fn read_pathinfo<R: AsyncReadExt + Unpin>(r: &mut R, proto: Proto) -> Result<PathInfo> {
818 let deriver = read_string(r)
819 .await
820 .map(|s| (!s.is_empty()).then_some(s)) .with_field("PathInfo.deriver")?;
822 let nar_hash = read_string(r).await.with_field("PathInfo.nar_hash")?;
823 let references = read_strings(r)
824 .collect::<Result<Vec<_>>>()
825 .await
826 .with_field("PathInfo.deriver")?;
827 let registration_time = read_datetime(r)
828 .await
829 .with_field("PathInfo.registration_time")?;
830 let nar_size = read_u64(r).await.with_field("PathInfo.nar_size")?;
831
832 let ultimate = OptionFuture::from(proto.since(16).then(|| read_bool(r)))
833 .await
834 .transpose()
835 .with_field("PathInfo.ultimate")?
836 .unwrap_or_default();
837 let signatures = OptionFuture::from(proto.since(16).then(|| read_strings(r).collect()))
838 .await
839 .transpose()
840 .with_field("PathInfo.signatures")?
841 .unwrap_or_default();
842 let ca = OptionFuture::from(proto.since(16).then(|| read_string(r)))
843 .await
844 .transpose()
845 .with_field("PathInfo.ca")?
846 .and_then(|s| (!s.is_empty()).then_some(s)); Ok(PathInfo {
849 deriver,
850 nar_hash,
851 references,
852 registration_time,
853 nar_size,
854 ultimate,
855 signatures,
856 ca,
857 })
858}
859#[instrument(skip(w, pi), level = "trace")]
861pub async fn write_pathinfo<W: AsyncWriteExt + Unpin>(
862 w: &mut W,
863 proto: Proto,
864 pi: &PathInfo,
865) -> Result<()> {
866 write_string(w, pi.deriver.as_ref().map(|s| s.as_str()).unwrap_or(""))
867 .await
868 .with_field("PathInfo.deriver")?;
869 write_string(w, pi.nar_hash.as_str())
870 .await
871 .with_field("PathInfo.nar_hash")?;
872 write_strings(w, &pi.references)
873 .await
874 .with_field("PathInfo.deriver")?;
875 write_u64(w, pi.registration_time.timestamp().try_into().unwrap())
876 .await
877 .with_field("PathInfo.registration_time")?;
878 write_u64(w, pi.nar_size)
879 .await
880 .with_field("PathInfo.nar_size")?;
881
882 if proto.since(16) {
883 write_bool(w, pi.ultimate)
884 .await
885 .with_field("PathInfo.ultimate")?;
886 write_strings(w, &pi.signatures)
887 .await
888 .with_field("PathInfo.signatures")?;
889 write_string(w, &pi.ca.as_ref().map(|s| s.as_str()).unwrap_or(""))
890 .await
891 .with_field("PathInfo.ca")?;
892 }
893 Ok(())
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use chrono::{TimeZone, Utc};
900 use tokio_stream::StreamExt;
901 use tokio_test::io::Builder;
902
903 fn pad_str<const L: usize>(s: &str) -> [u8; L] {
904 assert!(L % 8 == 0, "{} is not aligned to 8", L);
905 let mut v = [0u8; L];
906 (&mut v[..s.len()]).copy_from_slice(s.as_bytes());
907 v
908 }
909
910 #[tokio::test]
911 async fn test_copy_to_framed_empty() {
912 let mut r = Builder::new().read(&[]).build();
913 let mut w = Builder::new().write(&0u64.to_le_bytes()).build();
914 let mut buf = [0u8; 64];
915 copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
916 }
917
918 #[tokio::test]
919 async fn test_copy_to_framed_1() {
920 let mut r = Builder::new().read(&[1, 2, 3, 4]).build();
921 let mut w = Builder::new()
922 .write(&4u64.to_le_bytes())
923 .write(&[1, 2, 3, 4])
924 .write(&0u64.to_le_bytes())
925 .build();
926 let mut buf = [0u8; 64];
927 copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
928 }
929
930 #[tokio::test]
931 async fn test_copy_to_framed_2reads() {
932 let mut r = Builder::new()
934 .read(&[1, 2, 3, 4])
935 .read(&[5, 6, 7, 8, 9, 10])
936 .build();
937 let mut w = Builder::new()
938 .write(&4u64.to_le_bytes())
939 .write(&[1, 2, 3, 4])
940 .write(&6u64.to_le_bytes())
941 .write(&[5, 6, 7, 8, 9, 10])
942 .write(&0u64.to_le_bytes())
943 .build();
944 let mut buf = [0u8; 64];
945 copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
946 }
947
948 #[tokio::test]
949 async fn test_copy_to_framed_3buffers() {
950 let mut r = Builder::new().read(&[1, 2, 3, 4, 5]).build();
952 let mut w = Builder::new()
953 .write(&2u64.to_le_bytes())
954 .write(&[1, 2])
955 .write(&2u64.to_le_bytes())
956 .write(&[3, 4])
957 .write(&1u64.to_le_bytes())
958 .write(&[5])
959 .write(&0u64.to_le_bytes())
960 .build();
961 let mut buf = [0u8; 2];
962 copy_to_framed(&mut r, &mut w, &mut buf).await.unwrap();
963 }
964
965 #[tokio::test]
967 async fn test_read_u64() {
968 let mut mock = Builder::new().read(&1234567890u64.to_le_bytes()).build();
969 assert_eq!(1234567890u64, read_u64(&mut mock).await.unwrap());
970 }
971 #[tokio::test]
972 async fn test_write_u64() {
973 let mut mock = Builder::new().write(&1234567890u64.to_le_bytes()).build();
974 write_u64(&mut mock, 1234567890).await.unwrap();
975 }
976
977 #[tokio::test]
979 async fn test_read_bool_0() {
980 let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
981 assert_eq!(false, read_bool(&mut mock).await.unwrap());
982 }
983 #[tokio::test]
984 async fn test_read_bool_1() {
985 let mut mock = Builder::new().read(&1u64.to_le_bytes()).build();
986 assert_eq!(true, read_bool(&mut mock).await.unwrap());
987 }
988 #[tokio::test]
989 async fn test_read_bool_2() {
990 let mut mock = Builder::new().read(&2u64.to_le_bytes()).build();
991 assert_eq!(true, read_bool(&mut mock).await.unwrap());
992 }
993
994 #[tokio::test]
995 async fn test_write_bool_false() {
996 let mut mock = Builder::new().write(&0u64.to_le_bytes()).build();
997 write_bool(&mut mock, false).await.unwrap();
998 }
999 #[tokio::test]
1000 async fn test_write_bool_true() {
1001 let mut mock = Builder::new().write(&1u64.to_le_bytes()).build();
1002 write_bool(&mut mock, true).await.unwrap();
1003 }
1004
1005 #[tokio::test]
1007 async fn test_read_proto() {
1008 let mut mock = Builder::new().read(&[34, 12, 0, 0, 0, 0, 0, 0]).build();
1010 assert_eq!(Proto(12, 34), read_proto(&mut mock).await.unwrap());
1011 }
1012 #[tokio::test]
1013 async fn test_write_proto() {
1014 let mut mock = Builder::new().write(&[34, 12, 0, 0, 0, 0, 0, 0]).build();
1015 write_proto(&mut mock, Proto(12, 34)).await.unwrap();
1016 }
1017
1018 #[tokio::test]
1020 async fn test_read_verbosity() {
1021 let mut m = Builder::new()
1022 .read(&0u64.to_le_bytes()) .read(&1u64.to_le_bytes()) .read(&2u64.to_le_bytes()) .read(&3u64.to_le_bytes()) .read(&4u64.to_le_bytes()) .read(&5u64.to_le_bytes()) .read(&6u64.to_le_bytes()) .read(&7u64.to_le_bytes()) .build();
1031 assert_eq!(Verbosity::Error, read_verbosity(&mut m).await.unwrap());
1032 assert_eq!(Verbosity::Warn, read_verbosity(&mut m).await.unwrap());
1033 assert_eq!(Verbosity::Notice, read_verbosity(&mut m).await.unwrap());
1034 assert_eq!(Verbosity::Info, read_verbosity(&mut m).await.unwrap());
1035 assert_eq!(Verbosity::Talkative, read_verbosity(&mut m).await.unwrap());
1036 assert_eq!(Verbosity::Chatty, read_verbosity(&mut m).await.unwrap());
1037 assert_eq!(Verbosity::Debug, read_verbosity(&mut m).await.unwrap());
1038 assert_eq!(Verbosity::Vomit, read_verbosity(&mut m).await.unwrap());
1039 }
1040 #[tokio::test]
1041 async fn test_write_verbosity() {
1042 let mut m = Builder::new()
1043 .write(&0u64.to_le_bytes()) .write(&1u64.to_le_bytes()) .write(&2u64.to_le_bytes()) .write(&3u64.to_le_bytes()) .write(&4u64.to_le_bytes()) .write(&5u64.to_le_bytes()) .write(&6u64.to_le_bytes()) .write(&7u64.to_le_bytes()) .build();
1052 write_verbosity(&mut m, Verbosity::Error).await.unwrap();
1053 write_verbosity(&mut m, Verbosity::Warn).await.unwrap();
1054 write_verbosity(&mut m, Verbosity::Notice).await.unwrap();
1055 write_verbosity(&mut m, Verbosity::Info).await.unwrap();
1056 write_verbosity(&mut m, Verbosity::Talkative).await.unwrap();
1057 write_verbosity(&mut m, Verbosity::Chatty).await.unwrap();
1058 write_verbosity(&mut m, Verbosity::Debug).await.unwrap();
1059 write_verbosity(&mut m, Verbosity::Vomit).await.unwrap();
1060 }
1061
1062 #[tokio::test]
1064 async fn test_read_string_len_0() {
1065 let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
1066 assert_eq!("".to_string(), read_string(&mut mock).await.unwrap());
1067 }
1068 #[tokio::test]
1069 async fn test_read_string_len_1() {
1070 let mut mock = Builder::new()
1071 .read(&1u64.to_le_bytes())
1072 .read("a".as_bytes())
1073 .read(&[0u8; 7])
1074 .build();
1075 assert_eq!("a".to_string(), read_string(&mut mock).await.unwrap());
1076 }
1077 #[tokio::test]
1078 async fn test_read_string_len_8() {
1079 let mut mock = Builder::new()
1080 .read(&8u64.to_le_bytes())
1081 .read("i'm gay.".as_bytes())
1082 .build();
1083 assert_eq!(
1084 "i'm gay.".to_string(),
1085 read_string(&mut mock).await.unwrap()
1086 );
1087 }
1088
1089 #[tokio::test]
1090 async fn test_write_string_len_0() {
1091 let mut mock = Builder::new().write(&0u64.to_le_bytes()).build();
1092 write_string(&mut mock, "").await.unwrap();
1093 }
1094 #[tokio::test]
1095 async fn test_write_string_len_1() {
1096 let mut mock = Builder::new()
1097 .write(&1u64.to_le_bytes())
1098 .write("a\0\0\0\0\0\0\0".as_bytes())
1099 .build();
1100 write_string(&mut mock, "a").await.unwrap();
1101 }
1102 #[tokio::test]
1103 async fn test_write_string_len_8() {
1104 let mut mock = Builder::new()
1105 .write(&8u64.to_le_bytes())
1106 .write("i'm gay.".as_bytes())
1107 .build();
1108 write_string(&mut mock, "i'm gay.").await.unwrap();
1109 }
1110
1111 #[tokio::test]
1113 async fn test_read_string_len_1024() {
1114 let mut mock = Builder::new()
1115 .read(&1024u64.to_le_bytes())
1116 .read(&['a' as u8; 1024])
1117 .build();
1118 assert_eq!(
1119 String::from_iter(std::iter::repeat('a').take(1024)),
1120 read_string(&mut mock).await.unwrap()
1121 );
1122 }
1123 #[tokio::test]
1124 async fn test_read_string_len_1025() {
1125 let mut mock = Builder::new()
1126 .read(&1025u64.to_le_bytes())
1127 .read(&['a' as u8; 1025])
1128 .read(&[0u8; 7])
1129 .build();
1130 assert_eq!(
1131 String::from_iter(std::iter::repeat('a').take(1025)),
1132 read_string(&mut mock).await.unwrap()
1133 );
1134 }
1135 #[tokio::test]
1136 async fn test_read_string_len_2048() {
1137 let mut mock = Builder::new()
1138 .read(&2048u64.to_le_bytes())
1139 .read(&['a' as u8; 2048])
1140 .build();
1141 assert_eq!(
1142 String::from_iter(std::iter::repeat('a').take(2048)),
1143 read_string(&mut mock).await.unwrap()
1144 );
1145 }
1146
1147 #[tokio::test]
1148 async fn test_read_strings_0() {
1149 let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
1150 assert_eq!(
1151 Vec::<String>::new(),
1152 read_strings(&mut mock)
1153 .collect::<Result<Vec<_>>>()
1154 .await
1155 .unwrap()
1156 );
1157 }
1158 #[tokio::test]
1159 async fn test_read_strings_1() {
1160 let mut mock = Builder::new()
1161 .read(&1u64.to_le_bytes())
1162 .read(&8u64.to_le_bytes())
1163 .read("i'm gay.".as_bytes())
1164 .build();
1165 assert_eq!(
1166 vec!["i'm gay.".to_string()],
1167 read_strings(&mut mock)
1168 .collect::<Result<Vec<_>>>()
1169 .await
1170 .unwrap()
1171 );
1172 }
1173 #[tokio::test]
1174 async fn test_read_strings_4() {
1175 let mut mock = Builder::new()
1176 .read(&4u64.to_le_bytes())
1177 .read(&22u64.to_le_bytes())
1178 .read("according to all known\0\0".as_bytes())
1179 .read(&16u64.to_le_bytes())
1180 .read("laws of aviation".as_bytes())
1181 .read(&25u64.to_le_bytes())
1182 .read("there's no way that a bee\0\0\0\0\0\0\0".as_bytes())
1183 .read(&21u64.to_le_bytes())
1184 .read("should be able to fly\0\0\0".as_bytes())
1185 .build();
1186 assert_eq!(
1187 vec![
1188 "according to all known".to_string(),
1189 "laws of aviation".to_string(),
1190 "there's no way that a bee".to_string(),
1191 "should be able to fly".to_string()
1192 ],
1193 read_strings(&mut mock)
1194 .collect::<Result<Vec<_>>>()
1195 .await
1196 .unwrap()
1197 );
1198 }
1199
1200 #[tokio::test]
1201 async fn test_read_pathinfo_derived() {
1202 let mut mock = Builder::new()
1203 .read(&61u64.to_le_bytes()) .read(&pad_str::<64>(
1205 "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv",
1206 ))
1207 .read(&51u64.to_le_bytes()) .read(&pad_str::<56>(
1209 "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=",
1210 ))
1211 .read(&2u64.to_le_bytes()) .read(&52u64.to_le_bytes()) .read(&pad_str::<56>(
1214 "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3",
1215 ))
1216 .read(&57u64.to_le_bytes()) .read(&pad_str::<64>(
1218 "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27",
1219 ))
1220 .read(&1700495600u64.to_le_bytes()) .read(&1768960u64.to_le_bytes()) .read(&0u64.to_le_bytes()) .read(&1u64.to_le_bytes()) .read(&106u64.to_le_bytes()) .read(&pad_str::<112>(
1226 "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==",
1227 ))
1228 .read(&0u64.to_le_bytes()) .build();
1230 assert_eq!(
1231 PathInfo {
1232 deriver: Some(
1233 "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv".into()
1234 ),
1235 nar_hash: "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=".into(),
1236 references: vec![
1237 "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3".into(),
1238 "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27".into(),
1239 ],
1240 registration_time: Utc.with_ymd_and_hms(2023, 11, 20, 15, 53, 20).unwrap(),
1241 nar_size: 1768960,
1242 ultimate: false,
1243 signatures: vec![
1244 "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==".into(),
1245 ],
1246 ca: None,
1247 },
1248 read_pathinfo(&mut mock, Proto(1, 35)).await.unwrap()
1249 );
1250 }
1251 #[tokio::test]
1252 async fn test_read_pathinfo_ca() {
1253 let mut mock = Builder::new()
1254 .read(&0u64.to_le_bytes()) .read(&51u64.to_le_bytes()) .read(&pad_str::<56>(
1257 "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=",
1258 ))
1259 .read(&5u64.to_le_bytes()) .read(&60u64.to_le_bytes()) .read(&pad_str::<64>(
1262 "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv",
1263 ))
1264 .read(&58u64.to_le_bytes()) .read(&pad_str::<64>(
1266 "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv",
1267 ))
1268 .read(&54u64.to_le_bytes()) .read(&pad_str::<56>(
1270 "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh",
1271 ))
1272 .read(&60u64.to_le_bytes()) .read(&pad_str::<64>(
1274 "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv",
1275 ))
1276 .read(&60u64.to_le_bytes()) .read(&pad_str::<64>(
1278 "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv",
1279 ))
1280 .read(&1700854586u64.to_le_bytes()) .read(&3008u64.to_le_bytes()) .read(&0u64.to_le_bytes()) .read(&0u64.to_le_bytes()) .read(&64u64.to_le_bytes()) .read(&pad_str::<64>(
1286 "text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d",
1287 ))
1288 .build();
1289 assert_eq!(
1290 PathInfo {
1291 deriver: None,
1292 nar_hash: "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=".into(),
1293 references: vec![
1294 "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv".into(),
1295 "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv".into(),
1296 "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh".into(),
1297 "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv".into(),
1298 "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv".into(),
1299 ],
1300 registration_time: Utc.with_ymd_and_hms(2023, 11, 24, 19, 36, 26).unwrap(),
1301 nar_size: 3008,
1302 ultimate: false,
1303 signatures: vec![],
1304 ca: Some("text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d".into()),
1305 },
1306 read_pathinfo(&mut mock, Proto(1, 35)).await.unwrap()
1307 );
1308 }
1309
1310 #[tokio::test]
1311 async fn test_write_pathinfo_derived() {
1312 let mut mock = Builder::new()
1313 .write(&61u64.to_le_bytes()) .write(&pad_str::<64>(
1315 "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv",
1316 ))
1317 .write(&51u64.to_le_bytes()) .write(&pad_str::<56>(
1319 "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=",
1320 ))
1321 .write(&2u64.to_le_bytes()) .write(&52u64.to_le_bytes()) .write(&pad_str::<56>(
1324 "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3",
1325 ))
1326 .write(&57u64.to_le_bytes()) .write(&pad_str::<64>(
1328 "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27",
1329 ))
1330 .write(&1700495600u64.to_le_bytes()) .write(&1768960u64.to_le_bytes()) .write(&0u64.to_le_bytes()) .write(&1u64.to_le_bytes()) .write(&106u64.to_le_bytes()) .write(&pad_str::<112>(
1336 "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==",
1337 ))
1338 .write(&0u64.to_le_bytes()) .build();
1340 write_pathinfo(
1341 &mut mock,
1342 Proto(1, 35),
1343 &PathInfo {
1344 deriver: Some(
1345 "/nix/store/ffffffffffffffffffffffffffffffff-sqlite-3.43.2.drv".into(),
1346 ),
1347 nar_hash: "sha256-sUu8vqpIoy7ZpnQPcwvQasNqX2jJOSXeEwd1yFtTukU=".into(),
1348 references: vec![
1349 "/nix/store/ffffffffffffffffffffffffffffffff-zlib-1.3".into(),
1350 "/nix/store/ffffffffffffffffffffffffffffffff-glibc-2.38-27".into(),
1351 ],
1352 registration_time: Utc.with_ymd_and_hms(2023, 11, 20, 15, 53, 20).unwrap(),
1353 nar_size: 1768960,
1354 ultimate: false,
1355 signatures: vec![
1356 "cache.nixos.org-1:Efz+S0y30Eny+nbjeiS0vlUiEpmNbW+m1CiznlC5odPRpTfQUENj+AQcDsnEgvXmaTY9OqG0l5pMIBc6XAk6AQ==".into(),
1357 ],
1358 ca: None,
1359 },
1360 )
1361 .await
1362 .unwrap();
1363 }
1364 #[tokio::test]
1365 async fn test_write_pathinfo_ca() {
1366 let mut mock = Builder::new()
1367 .write(&0u64.to_le_bytes()) .write(&51u64.to_le_bytes()) .write(&pad_str::<56>(
1370 "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=",
1371 ))
1372 .write(&5u64.to_le_bytes()) .write(&60u64.to_le_bytes()) .write(&pad_str::<64>(
1375 "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv",
1376 ))
1377 .write(&58u64.to_le_bytes()) .write(&pad_str::<64>(
1379 "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv",
1380 ))
1381 .write(&54u64.to_le_bytes()) .write(&pad_str::<56>(
1383 "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh",
1384 ))
1385 .write(&60u64.to_le_bytes()) .write(&pad_str::<64>(
1387 "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv",
1388 ))
1389 .write(&60u64.to_le_bytes()) .write(&pad_str::<64>(
1391 "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv",
1392 ))
1393 .write(&1700854586u64.to_le_bytes()) .write(&3008u64.to_le_bytes()) .write(&0u64.to_le_bytes()) .write(&0u64.to_le_bytes()) .write(&64u64.to_le_bytes()) .write(&pad_str::<64>(
1399 "text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d",
1400 ))
1401 .build();
1402 write_pathinfo(
1403 &mut mock,
1404 Proto(1, 35),
1405 &PathInfo {
1406 deriver: None,
1407 nar_hash: "sha256-1JmbR4NOsYNvgbJlqjp+4/bfm22IvhakiE1DXNfx78s=".into(),
1408 references: vec![
1409 "/nix/store/ffffffffffffffffffffffffffffffff-bash-5.2-p15.drv".into(),
1410 "/nix/store/ffffffffffffffffffffffffffffffff-curl-8.4.0.drv".into(),
1411 "/nix/store/ffffffffffffffffffffffffffffffff-builder.sh".into(),
1412 "/nix/store/ffffffffffffffffffffffffffffffff-stdenv-linux.drv".into(),
1413 "/nix/store/ffffffffffffffffffffffffffffffff-mirrors-list.drv".into(),
1414 ],
1415 registration_time: Utc.with_ymd_and_hms(2023, 11, 24, 19, 36, 26).unwrap(),
1416 nar_size: 3008,
1417 ultimate: false,
1418 signatures: vec![],
1419 ca: Some("text:sha256:0yjycizc8v9950dz9a69a7qlzcba9gl2gls8svi1g1i75xxf206d".into()),
1420 },
1421 )
1422 .await
1423 .unwrap();
1424 }
1425
1426 #[tokio::test]
1429 #[allow(non_snake_case)]
1430 async fn test_cppnix__src_libstore_worker_protocol__string() {
1431 let mut mock = Builder::new()
1432 .write(&[
1433 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00,
1435 0x00, 0x00, 0x68, 0x69, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00,
1436 0x00, 0x00, 0x00, 0x00, 0x77, 0x68, 0x69, 0x74, 0x65, 0x20, 0x72, 0x61, 0x62, 0x62,
1437 0x69, 0x74, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1438 0xe5, 0xa4, 0xa7, 0xe7, 0x99, 0xbd, 0xe5, 0x85, 0x94, 0x00, 0x00, 0x00, 0x00, 0x00,
1439 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x6f, 0x68, 0x20, 0x6e,
1440 0x6f, 0x20, 0x00, 0x00,
1441 ])
1442 .build();
1443
1444 write_string(&mut mock, "").await.unwrap();
1446 write_string(&mut mock, "hi").await.unwrap();
1447 write_string(&mut mock, "white rabbit").await.unwrap();
1448 write_string(&mut mock, "大白兔").await.unwrap();
1449 write_string(&mut mock, "oh no \0\0\0 what was that!")
1450 .await
1451 .unwrap();
1452 }
1453
1454 #[tokio::test]
1455 async fn test_framedreader_empty() {
1456 let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
1457 let mut buf = Vec::new();
1458 let len = FramedReader::new(&mut mock)
1459 .read_to_end(&mut buf)
1460 .await
1461 .unwrap();
1462 assert_eq!(0, len);
1463 assert_eq!(0, buf.len());
1464 }
1465
1466 #[tokio::test]
1467 async fn test_framedreader_1f() {
1468 let mut mock = Builder::new()
1469 .read(&2u64.to_le_bytes())
1470 .read(&[1, 2])
1471 .read(&0u64.to_le_bytes())
1472 .build();
1473 let mut buf = Vec::new();
1474 let len = FramedReader::new(&mut mock)
1475 .read_to_end(&mut buf)
1476 .await
1477 .unwrap();
1478 assert_eq!(&[1, 2], &buf[..]);
1479 assert_eq!(2, len);
1480 }
1481
1482 #[tokio::test]
1483 async fn test_framedreader_2f() {
1484 let mut mock = Builder::new()
1485 .read(&2u64.to_le_bytes())
1486 .read(&[1, 2])
1487 .read(&4u64.to_le_bytes())
1488 .read(&[3, 4, 5, 6])
1489 .read(&0u64.to_le_bytes())
1490 .build();
1491 let mut buf = Vec::new();
1492 let len = FramedReader::new(&mut mock)
1493 .read_to_end(&mut buf)
1494 .await
1495 .unwrap();
1496 assert_eq!(&[0x01, 0x02, 0x03, 0x04, 0x05, 0x06], &buf[..]);
1497 assert_eq!(6, len);
1498 }
1499
1500 #[tokio::test]
1501 async fn test_framedreader_2f_overflow() {
1502 let mut mock = Builder::new()
1503 .read(&2u64.to_le_bytes())
1504 .read(&[1, 2])
1505 .read(&4u64.to_le_bytes())
1506 .read(&[3, 4, 5, 6])
1507 .read(&0u64.to_le_bytes())
1508 .build();
1509 let mut buf = [0u8; 2];
1510 let mut r = FramedReader::new(&mut mock);
1511 assert_eq!(2, r.read(&mut buf).await.unwrap());
1512 assert_eq!(&[1, 2], &buf[..]);
1513 assert_eq!(2, r.read(&mut buf).await.unwrap());
1514 assert_eq!(&[3, 4], &buf[..]);
1515 assert_eq!(2, r.read(&mut buf).await.unwrap());
1516 assert_eq!(&[5, 6], &buf[..]);
1517 assert_eq!(0, r.read(&mut buf).await.unwrap());
1518 }
1519}