1pub mod wire;
14
15use crate::{
16 BuildMode, ClientSettings, Error, PathInfo, Progress, Result, ResultExt, Stderr, Store,
17};
18use std::future::Future;
19use std::{collections::HashMap, fmt::Debug};
20use tokio::{
21 io::{AsyncReadExt, AsyncWriteExt},
22 net::UnixStream,
23};
24use tokio_stream::StreamExt;
25use tracing::{info, instrument};
26
27const MIN_PROTO: Proto = Proto(1, 35); const MAX_PROTO: Proto = Proto(1, 35); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
43pub struct Proto(u8, u8);
44
45impl From<u64> for Proto {
46 fn from(raw: u64) -> Self {
47 Self(((raw & 0xFF00) >> 8) as u8, (raw & 0x00FF) as u8)
48 }
49}
50impl From<Proto> for u64 {
51 fn from(v: Proto) -> Self {
52 ((v.0 as u64) << 8) | (v.1 as u64)
53 }
54}
55
56impl std::fmt::Display for Proto {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 write!(f, "{}.{}", self.0, self.1)
59 }
60}
61
62impl Proto {
63 fn since(&self, v: u8) -> bool {
64 self.1 >= v
65 }
66}
67
68trait DaemonProgressCaller {
69 fn call<
70 E: From<Error> + From<std::io::Error> + Send + Sync,
71 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
72 >(
73 self,
74 store: &mut DaemonStore<C>,
75 ) -> impl Future<Output = Result<(), E>> + Send;
76}
77
78trait DaemonProgressReturner {
79 type T: Send;
80 fn result<
81 E: From<Error> + From<std::io::Error> + Send + Sync,
82 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
83 >(
84 self,
85 store: &mut DaemonStore<C>,
86 ) -> impl Future<Output = Result<Self::T, E>> + Send;
87}
88
89struct DaemonProgress<'s, PC, PR, T: Send, C>
91where
92 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
93 PC: DaemonProgressCaller + Send,
94 PR: DaemonProgressReturner<T = T> + Send,
95{
96 store: &'s mut DaemonStore<C>,
97 fuse: bool,
98 caller: Option<PC>,
99 returner: PR,
100}
101impl<'s, PC, PR, T: Send, C> DaemonProgress<'s, PC, PR, T, C>
102where
103 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
104 PC: DaemonProgressCaller + Send,
105 PR: DaemonProgressReturner<T = T> + Send,
106{
107 fn new(store: &'s mut DaemonStore<C>, caller: PC, returner: PR) -> Self {
108 Self {
109 store,
110 fuse: false,
111 caller: Some(caller),
112 returner,
113 }
114 }
115}
116impl<'s, PC, PR, T: Send, C> Progress for DaemonProgress<'s, PC, PR, T, C>
117where
118 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
119 PC: DaemonProgressCaller + Send,
120 PR: DaemonProgressReturner<T = T> + Send,
121{
122 type T = T;
123 type Error = Error;
124
125 async fn next(&mut self) -> Result<Option<Stderr>> {
126 let store = &mut self.store;
127 if let Some(caller) = self.caller.take() {
128 caller.call::<Error, C>(store).await?;
129 }
130 if self.fuse {
131 Ok(None)
132 } else {
133 match wire::read_stderr(&mut store.conn).await? {
134 Some(Stderr::Error(err)) => Err(Error::NixError(err)),
135 Some(stderr) => Ok(Some(stderr)),
136 None => {
137 self.fuse = true;
138 Ok(None)
139 }
140 }
141 }
142 }
143
144 async fn result(mut self) -> Result<Self::T> {
145 while let Some(_) = self.next().await? {}
146 self.returner.result(self.store).await
147 }
148}
149
150#[derive(Debug, Default)]
151pub struct DaemonStoreBuilder {
152 }
154
155impl DaemonStoreBuilder {
156 pub async fn init<C: AsyncReadExt + AsyncWriteExt + Unpin>(
173 self,
174 conn: C,
175 ) -> Result<DaemonStore<C>> {
176 let mut store = DaemonStore {
177 conn,
178 buffer: [0u8; 1024],
179 proto: Proto(0, 0),
180 };
181 store.handshake().await?;
182 Ok(store)
183 }
184
185 pub async fn connect_unix<P: AsRef<std::path::Path>>(
199 self,
200 path: P,
201 ) -> Result<DaemonStore<UnixStream>> {
202 self.init(UnixStream::connect(path).await?).await
203 }
204}
205
206#[derive(Debug)]
221pub struct DaemonStore<C: AsyncReadExt + AsyncWriteExt + Unpin> {
222 conn: C,
223 buffer: [u8; 1024],
224 pub proto: Proto,
226}
227
228impl DaemonStore<UnixStream> {
229 pub fn builder() -> DaemonStoreBuilder {
231 DaemonStoreBuilder::default()
232 }
233}
234
235impl<C: AsyncReadExt + AsyncWriteExt + Unpin> DaemonStore<C> {
236 #[instrument(skip(self))]
237 async fn handshake(&mut self) -> Result<()> {
238 wire::write_u64(&mut self.conn, wire::WORKER_MAGIC_1)
240 .await
241 .with_field("magic1")?;
242 match wire::read_u64(&mut self.conn).await {
243 Ok(magic2 @ wire::WORKER_MAGIC_2) => Ok(magic2),
244 Ok(v) => Err(Error::Invalid(format!("{:#x}", v))),
245 Err(err) => Err(err.into()),
246 }
247 .with_field("magic2")?;
248
249 self.proto = wire::read_proto(&mut self.conn)
251 .await
252 .and_then(|proto| {
253 if proto.0 != 1 || proto < MIN_PROTO {
254 return Err(Error::Invalid(format!("{}", proto)));
255 }
256 Ok(proto)
257 })
258 .with_field("daemon_proto")?;
259 wire::write_proto(&mut self.conn, MAX_PROTO)
260 .await
261 .with_field("client_proto")?;
262
263 if self.proto >= Proto(1, 14) {
265 wire::write_u64(&mut self.conn, 0)
266 .await
267 .with_field("__obsolete_cpu_affinity")?;
268 }
269 if self.proto >= Proto(1, 11) {
270 wire::write_bool(&mut self.conn, false)
271 .await
272 .with_field("__obsolete_reserve_space")?;
273 }
274
275 if self.proto >= Proto(1, 33) {
277 wire::read_string(&mut self.conn)
278 .await
279 .with_field("nix_version")?;
280 }
281 if self.proto >= Proto(1, 35) {
282 wire::read_u64(&mut self.conn)
284 .await
285 .with_field("remote_trust")?;
286 }
287
288 while let Some(_) = wire::read_stderr(&mut self.conn).await? {}
290 Ok(())
291 }
292}
293
294impl<C: AsyncReadExt + AsyncWriteExt + Unpin + Send> Store for DaemonStore<C> {
295 type Error = Error;
296
297 #[instrument(skip(self))]
298 fn is_valid_path<S: AsRef<str> + Send + Sync + Debug>(
299 &mut self,
300 path: S,
301 ) -> impl Progress<T = bool, Error = Self::Error> {
302 struct Caller<S: AsRef<str> + Send + Sync + Debug> {
312 path: S,
313 }
314 impl<S: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<S> {
315 async fn call<
316 E: From<Error> + From<std::io::Error> + Send + Sync,
317 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
318 >(
319 self,
320 store: &mut DaemonStore<C>,
321 ) -> Result<(), E> {
322 wire::write_op(&mut store.conn, wire::Op::IsValidPath)
323 .await
324 .with_field("IsValidPath.<op>")?;
325 wire::write_string(&mut store.conn, &self.path)
326 .await
327 .with_field("IsValidPath.path")?;
328 Ok(())
329 }
330 }
331
332 struct Returner;
333 impl DaemonProgressReturner for Returner {
334 type T = bool;
335 async fn result<
336 E: From<Error> + From<std::io::Error> + Send + Sync,
337 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
338 >(
339 self,
340 store: &mut DaemonStore<C>,
341 ) -> Result<Self::T, E> {
342 Ok(wire::read_bool(&mut store.conn).await?)
343 }
344 }
345
346 DaemonProgress::new(self, Caller { path }, Returner)
347 }
348
349 #[instrument(skip(self))]
350 fn has_substitutes<P: AsRef<str> + Send + Sync + Debug>(
351 &mut self,
352 path: P,
353 ) -> impl Progress<T = bool, Error = Self::Error> {
354 struct Caller<P: AsRef<str> + Send + Sync + Debug> {
355 path: P,
356 }
357 impl<P: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<P> {
358 async fn call<
359 E: From<Error> + From<std::io::Error> + Send + Sync,
360 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
361 >(
362 self,
363 store: &mut DaemonStore<C>,
364 ) -> Result<(), E> {
365 wire::write_op(&mut store.conn, wire::Op::HasSubstitutes)
366 .await
367 .with_field("HasSubstitutes.<op>")?;
368 wire::write_string(&mut store.conn, &self.path)
369 .await
370 .with_field("HasSubstitutes.path")?;
371 Ok(())
372 }
373 }
374
375 struct Returner;
376 impl DaemonProgressReturner for Returner {
377 type T = bool;
378 async fn result<
379 E: From<Error> + From<std::io::Error> + Send + Sync,
380 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
381 >(
382 self,
383 store: &mut DaemonStore<C>,
384 ) -> Result<Self::T, E> {
385 Ok(wire::read_bool(&mut store.conn).await?)
386 }
387 }
388
389 DaemonProgress::new(self, Caller { path }, Returner)
390 }
391
392 #[instrument(skip(self, source))]
393 fn add_to_store<
394 SN: AsRef<str> + Send + Sync + Debug,
395 SC: AsRef<str> + Send + Sync + Debug,
396 Refs,
397 R,
398 >(
399 &mut self,
400 name: SN,
401 cam_str: SC,
402 refs: Refs,
403 repair: bool,
404 source: R,
405 ) -> impl Progress<T = (String, PathInfo), Error = Self::Error>
406 where
407 Refs: IntoIterator + Send + Debug,
408 Refs::IntoIter: ExactSizeIterator + Send,
409 Refs::Item: AsRef<str> + Send + Sync,
410 R: AsyncReadExt + Unpin + Send + Debug,
411 {
412 struct Caller<
413 SN: AsRef<str> + Send + Sync + Debug,
414 SC: AsRef<str> + Send + Sync + Debug,
415 Refs,
416 R,
417 >
418 where
419 Refs: IntoIterator + Send + Debug,
420 Refs::IntoIter: ExactSizeIterator + Send,
421 Refs::Item: AsRef<str> + Send + Sync,
422 R: AsyncReadExt + Unpin + Send + Debug,
423 {
424 name: SN,
425 cam_str: SC,
426 refs: Refs,
427 repair: bool,
428 source: R,
429 }
430 impl<
431 SN: AsRef<str> + Send + Sync + Debug,
432 SC: AsRef<str> + Send + Sync + Debug,
433 Refs,
434 R,
435 > DaemonProgressCaller for Caller<SN, SC, Refs, R>
436 where
437 Refs: IntoIterator + Send + Debug,
438 Refs::IntoIter: ExactSizeIterator + Send,
439 Refs::Item: AsRef<str> + Send + Sync,
440 R: AsyncReadExt + Unpin + Send + Debug,
441 {
442 async fn call<
443 E: From<Error> + From<std::io::Error> + Send + Sync,
444 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
445 >(
446 mut self,
447 store: &mut DaemonStore<C>,
448 ) -> Result<(), E> {
449 match store.proto {
450 Proto(1, 25..) => {
451 wire::write_op(&mut store.conn, wire::Op::AddToStore)
452 .await
453 .with_field("AddToStore.<op>")?;
454 wire::write_string(&mut store.conn, self.name)
455 .await
456 .with_field("AddToStore.name")?;
457 wire::write_string(&mut store.conn, self.cam_str)
458 .await
459 .with_field("AddToStore.camStr")?;
460 wire::write_strings(&mut store.conn, self.refs)
461 .await
462 .with_field("AddToStore.refs")?;
463 wire::write_bool(&mut store.conn, self.repair)
464 .await
465 .with_field("AddToStore.repair")?;
466 wire::copy_to_framed(&mut self.source, &mut store.conn, &mut store.buffer)
467 .await
468 .with_field("AddToStore.<source>")?;
469 Ok(())
470 }
471 proto => Err(Error::Invalid(format!(
472 "AddToStore is not implemented for Protocol {}",
473 proto
474 ))
475 .into()),
476 }
477 }
478 }
479
480 struct Returner;
481 impl DaemonProgressReturner for Returner {
482 type T = (String, PathInfo);
483 async fn result<
484 E: From<Error> + From<std::io::Error> + Send + Sync,
485 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
486 >(
487 self,
488 store: &mut DaemonStore<C>,
489 ) -> Result<Self::T, E> {
490 Ok((
491 wire::read_string(&mut store.conn)
492 .await
493 .with_field("name")?,
494 wire::read_pathinfo(&mut store.conn, store.proto)
495 .await
496 .with_field("PathInfo")?,
497 ))
498 }
499 }
500
501 DaemonProgress::new(
502 self,
503 Caller {
504 name,
505 cam_str,
506 refs,
507 repair,
508 source,
509 },
510 Returner,
511 )
512 }
513
514 #[instrument(skip(self))]
515 fn build_paths<Paths>(
516 &mut self,
517 paths: Paths,
518 mode: BuildMode,
519 ) -> impl Progress<T = (), Error = Self::Error>
520 where
521 Paths: IntoIterator + Send + Debug,
522 Paths::IntoIter: ExactSizeIterator + Send,
523 Paths::Item: AsRef<str> + Send + Sync,
524 {
525 struct Caller<Paths>
526 where
527 Paths: IntoIterator + Send + Debug,
528 Paths::IntoIter: ExactSizeIterator + Send,
529 Paths::Item: AsRef<str> + Send + Sync,
530 {
531 paths: Paths,
532 mode: BuildMode,
533 }
534 impl<Paths> DaemonProgressCaller for Caller<Paths>
535 where
536 Paths: IntoIterator + Send + Debug,
537 Paths::IntoIter: ExactSizeIterator + Send,
538 Paths::Item: AsRef<str> + Send + Sync,
539 {
540 async fn call<
541 E: From<Error> + From<std::io::Error> + Send + Sync,
542 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
543 >(
544 self,
545 store: &mut DaemonStore<C>,
546 ) -> Result<(), E> {
547 wire::write_op(&mut store.conn, wire::Op::BuildPaths)
548 .await
549 .with_field("BuildPaths.<op>")?;
550 wire::write_strings(&mut store.conn, self.paths)
551 .await
552 .with_field("BuildPaths.paths")?;
553 if store.proto >= Proto(1, 15) {
554 wire::write_build_mode(&mut store.conn, self.mode)
555 .await
556 .with_field("BuildPaths.build_mode")?;
557 }
558 Ok(())
559 }
560 }
561
562 struct Returner;
563 impl DaemonProgressReturner for Returner {
564 type T = ();
565 async fn result<
566 E: From<Error> + From<std::io::Error> + Send + Sync,
567 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
568 >(
569 self,
570 store: &mut DaemonStore<C>,
571 ) -> Result<Self::T, E> {
572 wire::read_u64(&mut store.conn)
573 .await
574 .with_field("__unused__")?;
575 Ok(())
576 }
577 }
578
579 DaemonProgress::new(self, Caller { paths, mode }, Returner)
580 }
581
582 #[instrument(skip(self))]
583 fn ensure_path<Path: AsRef<str> + Send + Sync + Debug>(
584 &mut self,
585 path: Path,
586 ) -> impl Progress<T = (), Error = Self::Error> {
587 struct Caller<Path: AsRef<str> + Send + Sync + Debug> {
588 path: Path,
589 }
590 impl<Path: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<Path> {
591 async fn call<
592 E: From<Error> + From<std::io::Error> + Send + Sync,
593 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
594 >(
595 self,
596 store: &mut DaemonStore<C>,
597 ) -> Result<(), E> {
598 wire::write_op(&mut store.conn, wire::Op::EnsurePath)
599 .await
600 .with_field("EnsurePath.<op>")?;
601 wire::write_string(&mut store.conn, self.path)
602 .await
603 .with_field("EnsurePath.path")?;
604 Ok(())
605 }
606 }
607
608 struct Returner;
609 impl DaemonProgressReturner for Returner {
610 type T = ();
611 async fn result<
612 E: From<Error> + From<std::io::Error> + Send + Sync,
613 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
614 >(
615 self,
616 store: &mut DaemonStore<C>,
617 ) -> Result<Self::T, E> {
618 wire::read_u64(&mut store.conn)
619 .await
620 .with_field("__unused__")?;
621 Ok(())
622 }
623 }
624
625 DaemonProgress::new(self, Caller { path }, Returner)
626 }
627
628 #[instrument(skip(self))]
629 fn add_temp_root<Path: AsRef<str> + Send + Sync + Debug>(
630 &mut self,
631 path: Path,
632 ) -> impl Progress<T = (), Error = Self::Error> {
633 struct Caller<Path: AsRef<str> + Send + Sync + Debug> {
634 path: Path,
635 }
636 impl<Path: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<Path> {
637 async fn call<
638 E: From<Error> + From<std::io::Error> + Send + Sync,
639 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
640 >(
641 self,
642 store: &mut DaemonStore<C>,
643 ) -> Result<(), E> {
644 wire::write_op(&mut store.conn, wire::Op::AddTempRoot)
645 .await
646 .with_field("AddTempRoot.<op>")?;
647 wire::write_string(&mut store.conn, self.path)
648 .await
649 .with_field("AddTempRoot.path")?;
650 Ok(())
651 }
652 }
653
654 struct Returner;
655 impl DaemonProgressReturner for Returner {
656 type T = ();
657 async fn result<
658 E: From<Error> + From<std::io::Error> + Send + Sync,
659 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
660 >(
661 self,
662 store: &mut DaemonStore<C>,
663 ) -> Result<Self::T, E> {
664 wire::read_u64(&mut store.conn)
665 .await
666 .with_field("__unused__")?;
667 Ok(())
668 }
669 }
670
671 DaemonProgress::new(self, Caller { path }, Returner)
672 }
673
674 #[instrument(skip(self))]
675 fn add_indirect_root<Path: AsRef<str> + Send + Sync + Debug>(
676 &mut self,
677 path: Path,
678 ) -> impl Progress<T = (), Error = Self::Error> {
679 struct Caller<Path: AsRef<str> + Send + Sync + Debug> {
680 path: Path,
681 }
682 impl<Path: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<Path> {
683 async fn call<
684 E: From<Error> + From<std::io::Error> + Send + Sync,
685 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
686 >(
687 self,
688 store: &mut DaemonStore<C>,
689 ) -> Result<(), E> {
690 wire::write_op(&mut store.conn, wire::Op::AddIndirectRoot)
691 .await
692 .with_field("AddIndirectRoot.<op>")?;
693 wire::write_string(&mut store.conn, self.path)
694 .await
695 .with_field("AddIndirectRoot.path")?;
696 Ok(())
697 }
698 }
699
700 struct Returner;
701 impl DaemonProgressReturner for Returner {
702 type T = ();
703 async fn result<
704 E: From<Error> + From<std::io::Error> + Send + Sync,
705 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
706 >(
707 self,
708 store: &mut DaemonStore<C>,
709 ) -> Result<Self::T, E> {
710 wire::read_u64(&mut store.conn)
711 .await
712 .with_field("__unused__")?;
713 Ok(())
714 }
715 }
716
717 DaemonProgress::new(self, Caller { path }, Returner)
718 }
719
720 #[instrument(skip(self))]
721 fn find_roots(&mut self) -> impl Progress<T = HashMap<String, String>, Error = Self::Error> {
722 struct Caller;
723 impl DaemonProgressCaller for Caller {
724 async fn call<
725 E: From<Error> + From<std::io::Error> + Send + Sync,
726 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
727 >(
728 self,
729 store: &mut DaemonStore<C>,
730 ) -> Result<(), E> {
731 wire::write_op(&mut store.conn, wire::Op::FindRoots)
732 .await
733 .with_field("FindRoots.<op>")?;
734 Ok(())
735 }
736 }
737
738 struct Returner;
739 impl DaemonProgressReturner for Returner {
740 type T = HashMap<String, String>;
741 async fn result<
742 E: From<Error> + From<std::io::Error> + Send + Sync,
743 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
744 >(
745 self,
746 store: &mut DaemonStore<C>,
747 ) -> Result<Self::T, E> {
748 let count = wire::read_u64(&mut store.conn)
749 .await
750 .with_field("FindRoots.roots[].<count>")?;
751 let mut roots = HashMap::with_capacity(count as usize);
752 for _ in 0..count {
753 let link = wire::read_string(&mut store.conn)
754 .await
755 .with_field("FindRoots.roots[].link")?;
756 let target = wire::read_string(&mut store.conn)
757 .await
758 .with_field("FindRoots.roots[].target")?;
759 roots.insert(link, target);
760 }
761 Ok(roots)
762 }
763 }
764
765 DaemonProgress::new(self, Caller, Returner)
766 }
767
768 #[instrument(skip(self))]
769 fn set_options(&mut self, opts: ClientSettings) -> impl Progress<T = (), Error = Self::Error> {
770 struct Caller {
771 opts: ClientSettings,
772 }
773 impl DaemonProgressCaller for Caller {
774 async fn call<
775 E: From<Error> + From<std::io::Error> + Send + Sync,
776 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
777 >(
778 self,
779 store: &mut DaemonStore<C>,
780 ) -> Result<(), E> {
781 wire::write_op(&mut store.conn, wire::Op::SetOptions)
782 .await
783 .with_field("SetOptions.<op>")?;
784 wire::write_client_settings(&mut store.conn, store.proto, &self.opts)
785 .await
786 .with_field("SetOptions.clientSettings")?;
787 Ok(())
788 }
789 }
790
791 struct Returner;
792 impl DaemonProgressReturner for Returner {
793 type T = ();
794 async fn result<
795 E: From<Error> + From<std::io::Error> + Send + Sync,
796 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
797 >(
798 self,
799 _store: &mut DaemonStore<C>,
800 ) -> Result<Self::T, E> {
801 Ok(())
802 }
803 }
804
805 DaemonProgress::new(self, Caller { opts }, Returner)
806 }
807
808 #[instrument(skip(self))]
809 fn query_pathinfo<S: AsRef<str> + Send + Sync + Debug>(
810 &mut self,
811 path: S,
812 ) -> impl Progress<T = Option<PathInfo>, Error = Self::Error> {
813 struct Caller<S: AsRef<str> + Send + Sync + Debug> {
814 path: S,
815 }
816 impl<S: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<S> {
817 async fn call<
818 E: From<Error> + From<std::io::Error> + Send + Sync,
819 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
820 >(
821 self,
822 store: &mut DaemonStore<C>,
823 ) -> Result<(), E> {
824 wire::write_op(&mut store.conn, wire::Op::QueryPathInfo)
825 .await
826 .with_field("QueryPathInfo.<op>")?;
827 wire::write_string(&mut store.conn, &self.path)
828 .await
829 .with_field("QueryPathInfo.path")?;
830 Ok(())
831 }
832 }
833
834 struct Returner;
835 impl DaemonProgressReturner for Returner {
836 type T = Option<PathInfo>;
837 async fn result<
838 E: From<Error> + From<std::io::Error> + Send + Sync,
839 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
840 >(
841 self,
842 store: &mut DaemonStore<C>,
843 ) -> Result<Self::T, E> {
844 if wire::read_bool(&mut store.conn).await? {
845 Ok(Some(
846 wire::read_pathinfo(&mut store.conn, store.proto).await?,
847 ))
848 } else {
849 Ok(None)
850 }
851 }
852 }
853
854 DaemonProgress::new(self, Caller { path }, Returner)
855 }
856
857 #[instrument(skip(self))]
858 fn query_valid_paths<Paths>(
859 &mut self,
860 paths: Paths,
861 use_substituters: bool,
862 ) -> impl Progress<T = Vec<String>, Error = Self::Error>
863 where
864 Paths: IntoIterator + Send + Debug,
865 Paths::IntoIter: ExactSizeIterator + Send,
866 Paths::Item: AsRef<str> + Send + Sync,
867 {
868 struct Caller<Paths>
869 where
870 Paths: IntoIterator + Send + Debug,
871 Paths::IntoIter: ExactSizeIterator + Send,
872 Paths::Item: AsRef<str> + Send + Sync,
873 {
874 paths: Paths,
875 use_substituters: bool,
876 }
877 impl<Paths> DaemonProgressCaller for Caller<Paths>
878 where
879 Paths: IntoIterator + Send + Debug,
880 Paths::IntoIter: ExactSizeIterator + Send,
881 Paths::Item: AsRef<str> + Send + Sync,
882 {
883 async fn call<
884 E: From<Error> + From<std::io::Error> + Send + Sync,
885 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
886 >(
887 self,
888 store: &mut DaemonStore<C>,
889 ) -> Result<(), E> {
890 match store.proto {
891 Proto(1, 12..) => {
892 wire::write_op(&mut store.conn, wire::Op::QueryValidPaths)
893 .await
894 .with_field("QueryValidPaths.<op>")?;
895 wire::write_strings(&mut store.conn, self.paths)
896 .await
897 .with_field("QueryValidPaths.path")?;
898 if store.proto >= Proto(1, 27) {
899 wire::write_bool(&mut store.conn, self.use_substituters)
900 .await
901 .with_field("QueryValidPaths.use_substituters")?;
902 }
903 Ok(())
904 }
905 proto => Err(Error::Invalid(format!(
906 "QueryValidPaths is not implemented for Protocol {}",
907 proto
908 ))
909 .into()),
910 }
911 }
912 }
913
914 struct Returner;
915 impl DaemonProgressReturner for Returner {
916 type T = Vec<String>;
917 async fn result<
918 E: From<Error> + From<std::io::Error> + Send + Sync,
919 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
920 >(
921 self,
922 store: &mut DaemonStore<C>,
923 ) -> Result<Self::T, E> {
924 Ok(wire::read_strings(&mut store.conn)
925 .collect::<Result<Vec<_>>>()
926 .await?)
927 }
928 }
929
930 DaemonProgress::new(
931 self,
932 Caller {
933 paths,
934 use_substituters,
935 },
936 Returner,
937 )
938 }
939
940 #[instrument(skip(self))]
941 fn query_substitutable_paths<Paths>(
942 &mut self,
943 paths: Paths,
944 ) -> impl Progress<T = Vec<String>, Error = Self::Error>
945 where
946 Paths: IntoIterator + Send + Debug,
947 Paths::IntoIter: ExactSizeIterator + Send,
948 Paths::Item: AsRef<str> + Send + Sync,
949 {
950 struct Caller<Paths>
951 where
952 Paths: IntoIterator + Send + Debug,
953 Paths::IntoIter: ExactSizeIterator + Send,
954 Paths::Item: AsRef<str> + Send + Sync,
955 {
956 paths: Paths,
957 }
958 impl<Paths> DaemonProgressCaller for Caller<Paths>
959 where
960 Paths: IntoIterator + Send + Debug,
961 Paths::IntoIter: ExactSizeIterator + Send,
962 Paths::Item: AsRef<str> + Send + Sync,
963 {
964 async fn call<
965 E: From<Error> + From<std::io::Error> + Send + Sync,
966 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
967 >(
968 self,
969 store: &mut DaemonStore<C>,
970 ) -> Result<(), E> {
971 wire::write_op(&mut store.conn, wire::Op::QuerySubstitutablePaths)
972 .await
973 .with_field("QuerySubstitutablePaths.<op>")?;
974 wire::write_strings(&mut store.conn, self.paths)
975 .await
976 .with_field("QuerySubstitutablePaths.path")?;
977 Ok(())
978 }
979 }
980
981 struct Returner;
982 impl DaemonProgressReturner for Returner {
983 type T = Vec<String>;
984 async fn result<
985 E: From<Error> + From<std::io::Error> + Send + Sync,
986 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
987 >(
988 self,
989 store: &mut DaemonStore<C>,
990 ) -> Result<Self::T, E> {
991 Ok(wire::read_strings(&mut store.conn)
992 .collect::<Result<Vec<_>>>()
993 .await?)
994 }
995 }
996
997 DaemonProgress::new(self, Caller { paths }, Returner)
998 }
999
1000 #[instrument(skip(self))]
1001 fn query_valid_derivers<S: AsRef<str> + Send + Sync + Debug>(
1002 &mut self,
1003 path: S,
1004 ) -> impl Progress<T = Vec<String>, Error = Self::Error> {
1005 struct Caller<S: AsRef<str> + Send + Sync + Debug> {
1006 path: S,
1007 }
1008 impl<S: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<S> {
1009 async fn call<
1010 E: From<Error> + From<std::io::Error> + Send + Sync,
1011 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1012 >(
1013 self,
1014 store: &mut DaemonStore<C>,
1015 ) -> Result<(), E> {
1016 wire::write_op(&mut store.conn, wire::Op::QueryValidDerivers)
1017 .await
1018 .with_field("QueryValidDerivers.<op>")?;
1019 wire::write_string(&mut store.conn, &self.path)
1020 .await
1021 .with_field("QueryValidDerivers.path")?;
1022 Ok(())
1023 }
1024 }
1025
1026 struct Returner;
1027 impl DaemonProgressReturner for Returner {
1028 type T = Vec<String>;
1029 async fn result<
1030 E: From<Error> + From<std::io::Error> + Send + Sync,
1031 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1032 >(
1033 self,
1034 store: &mut DaemonStore<C>,
1035 ) -> Result<Self::T, E> {
1036 Ok(wire::read_strings(&mut store.conn)
1037 .collect::<Result<Vec<String>>>()
1038 .await
1039 .with_field("QueryValidDerivers.paths")?)
1040 }
1041 }
1042
1043 DaemonProgress::new(self, Caller { path }, Returner)
1044 }
1045
1046 #[instrument(skip(self))]
1047 fn query_missing<Ps>(
1048 &mut self,
1049 paths: Ps,
1050 ) -> impl Progress<T = crate::Missing, Error = Self::Error>
1051 where
1052 Ps: IntoIterator + Send + Debug,
1053 Ps::IntoIter: ExactSizeIterator + Send,
1054 Ps::Item: AsRef<str> + Send + Sync,
1055 {
1056 struct Caller<Ps>
1057 where
1058 Ps: IntoIterator + Send + Debug,
1059 Ps::IntoIter: ExactSizeIterator + Send,
1060 Ps::Item: AsRef<str> + Send + Sync,
1061 {
1062 paths: Ps,
1063 }
1064 impl<Ps> DaemonProgressCaller for Caller<Ps>
1065 where
1066 Ps: IntoIterator + Send + Debug,
1067 Ps::IntoIter: ExactSizeIterator + Send,
1068 Ps::Item: AsRef<str> + Send + Sync,
1069 {
1070 async fn call<
1071 E: From<Error> + From<std::io::Error> + Send + Sync,
1072 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1073 >(
1074 self,
1075 store: &mut DaemonStore<C>,
1076 ) -> Result<(), E> {
1077 wire::write_op(&mut store.conn, wire::Op::QueryMissing)
1078 .await
1079 .with_field("QueryMissing.<op>")?;
1080 wire::write_strings(&mut store.conn, self.paths)
1081 .await
1082 .with_field("QueryMissing.paths")?;
1083 Ok(())
1084 }
1085 }
1086
1087 struct Returner;
1088 impl DaemonProgressReturner for Returner {
1089 type T = crate::Missing;
1090 async fn result<
1091 E: From<Error> + From<std::io::Error> + Send + Sync,
1092 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1093 >(
1094 self,
1095 store: &mut DaemonStore<C>,
1096 ) -> Result<Self::T, E> {
1097 let will_build = wire::read_strings(&mut store.conn)
1098 .collect::<Result<Vec<String>>>()
1099 .await
1100 .with_field("QueryMissing.will_build")?;
1101 let will_substitute = wire::read_strings(&mut store.conn)
1102 .collect::<Result<Vec<String>>>()
1103 .await
1104 .with_field("QueryMissing.will_substitute")?;
1105 let unknown = wire::read_strings(&mut store.conn)
1106 .collect::<Result<Vec<String>>>()
1107 .await
1108 .with_field("QueryMissing.unknown")?;
1109 let download_size = wire::read_u64(&mut store.conn)
1110 .await
1111 .with_field("QueryMissing.download_size")?;
1112 let nar_size = wire::read_u64(&mut store.conn)
1113 .await
1114 .with_field("QueryMissing.nar_size")?;
1115 Ok(crate::Missing {
1116 will_build,
1117 will_substitute,
1118 unknown,
1119 download_size,
1120 nar_size,
1121 })
1122 }
1123 }
1124
1125 DaemonProgress::new(self, Caller { paths }, Returner)
1126 }
1127
1128 #[instrument(skip(self))]
1129 fn query_derivation_output_map<P: AsRef<str> + Send + Sync + Debug>(
1130 &mut self,
1131 path: P,
1132 ) -> impl Progress<T = HashMap<String, String>, Error = Self::Error> {
1133 struct Caller<P: AsRef<str> + Send + Sync + Debug> {
1134 path: P,
1135 }
1136 impl<P: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<P> {
1137 async fn call<
1138 E: From<Error> + From<std::io::Error> + Send + Sync,
1139 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1140 >(
1141 self,
1142 store: &mut DaemonStore<C>,
1143 ) -> Result<(), E> {
1144 wire::write_op(&mut store.conn, wire::Op::QueryDerivationOutputMap)
1145 .await
1146 .with_field("QueryDerivationOutputMap.<op>")?;
1147 wire::write_string(&mut store.conn, self.path)
1148 .await
1149 .with_field("QueryDerivationOutputMap.paths")?;
1150 Ok(())
1151 }
1152 }
1153
1154 struct Returner;
1155 impl DaemonProgressReturner for Returner {
1156 type T = HashMap<String, String>;
1157 async fn result<
1158 E: From<Error> + From<std::io::Error> + Send + Sync,
1159 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1160 >(
1161 self,
1162 store: &mut DaemonStore<C>,
1163 ) -> Result<Self::T, E> {
1164 let mut outputs = HashMap::new();
1165 let count = wire::read_u64(&mut store.conn)
1166 .await
1167 .with_field("QueryDerivationOutputMap.outputs[].<count>")?;
1168 for _ in 0..count {
1169 let name = wire::read_string(&mut store.conn)
1170 .await
1171 .with_field("QueryDerivationOutputMap.outputs[].name")?;
1172 let path = wire::read_string(&mut store.conn)
1173 .await
1174 .with_field("QueryDerivationOutputMap.outputs[].path")?;
1175 outputs.insert(name, path);
1176 }
1177 Ok(outputs)
1178 }
1179 }
1180
1181 DaemonProgress::new(self, Caller { path }, Returner)
1182 }
1183
1184 #[instrument(skip(self))]
1185 fn build_paths_with_results<Ps>(
1186 &mut self,
1187 paths: Ps,
1188 mode: BuildMode,
1189 ) -> impl Progress<T = HashMap<String, crate::BuildResult>, Error = Self::Error>
1190 where
1191 Ps: IntoIterator + Send + Debug,
1192 Ps::IntoIter: ExactSizeIterator + Send,
1193 Ps::Item: AsRef<str> + Send + Sync,
1194 {
1195 struct Caller<Ps>
1196 where
1197 Ps: IntoIterator + Send + Debug,
1198 Ps::IntoIter: ExactSizeIterator + Send,
1199 Ps::Item: AsRef<str> + Send + Sync,
1200 {
1201 paths: Ps,
1202 mode: BuildMode,
1203 }
1204 impl<Ps> DaemonProgressCaller for Caller<Ps>
1205 where
1206 Ps: IntoIterator + Send + Debug,
1207 Ps::IntoIter: ExactSizeIterator + Send,
1208 Ps::Item: AsRef<str> + Send + Sync,
1209 {
1210 async fn call<
1211 E: From<Error> + From<std::io::Error> + Send + Sync,
1212 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1213 >(
1214 self,
1215 store: &mut DaemonStore<C>,
1216 ) -> Result<(), E> {
1217 wire::write_op(&mut store.conn, wire::Op::BuildPathsWithResults)
1218 .await
1219 .with_field("BuildPathsWithResults.<op>")?;
1220 wire::write_strings(&mut store.conn, self.paths)
1221 .await
1222 .with_field("BuildPathsWithResults.paths")?;
1223 wire::write_build_mode(&mut store.conn, self.mode)
1224 .await
1225 .with_field("BuildPathsWithResults.build_mode")?;
1226 Ok(())
1227 }
1228 }
1229
1230 struct Returner;
1231 impl DaemonProgressReturner for Returner {
1232 type T = HashMap<String, crate::BuildResult>;
1233 async fn result<
1234 E: From<Error> + From<std::io::Error> + Send + Sync,
1235 C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1236 >(
1237 self,
1238 store: &mut DaemonStore<C>,
1239 ) -> Result<Self::T, E> {
1240 let count = wire::read_u64(&mut store.conn)
1241 .await
1242 .with_field("BuildPathsWithResults.results.<count>")?;
1243 let mut results = HashMap::with_capacity(count as usize);
1244 for _ in 0..count {
1245 let path = wire::read_string(&mut store.conn)
1246 .await
1247 .with_field("BuildPathsWithResults.results[].path")?;
1248 let result = wire::read_build_result(&mut store.conn, store.proto)
1249 .await
1250 .with_field("BuildPathsWithResults.results[].result")?;
1251 results.insert(path, result);
1252 }
1253 Ok(results)
1254 }
1255 }
1256 DaemonProgress::new(self, Caller { paths, mode }, Returner)
1257 }
1258}
1259
1260#[derive(Debug)]
1261pub struct DaemonProtocolAdapterBuilder<'s, S: Store> {
1262 pub store: &'s mut S,
1263 pub nix_version: String,
1264 pub remote_trust: Option<bool>,
1265}
1266
1267impl<'s, S: Store> DaemonProtocolAdapterBuilder<'s, S> {
1268 fn new(store: &'s mut S) -> Self {
1269 Self {
1270 store,
1271 nix_version: concat!("gorgon/nix-daemon ", env!("CARGO_PKG_VERSION")).to_string(),
1272 remote_trust: None,
1273 }
1274 }
1275
1276 pub async fn adopt<
1282 R: AsyncReadExt + Unpin + Send + Debug,
1283 W: AsyncWriteExt + Unpin + Send + Debug,
1284 >(
1285 self,
1286 r: R,
1287 w: W,
1288 ) -> Result<DaemonProtocolAdapter<'s, S, R, W>> {
1289 DaemonProtocolAdapter::handshake(r, w, self.store, self.nix_version, self.remote_trust)
1290 .await
1291 }
1292}
1293
1294pub struct DaemonProtocolAdapter<'s, S: Store, R, W>
1323where
1324 R: AsyncReadExt + Unpin + Send + Debug,
1325 W: AsyncWriteExt + Unpin + Send + Debug,
1326{
1327 r: R,
1328 w: W,
1329 store: &'s mut S,
1330 pub proto: Proto,
1332}
1333
1334impl<'s, S: Store>
1335 DaemonProtocolAdapter<'s, S, tokio::net::unix::OwnedReadHalf, tokio::net::unix::OwnedWriteHalf>
1336{
1337 pub fn builder(store: &'s mut S) -> DaemonProtocolAdapterBuilder<'s, S> {
1338 DaemonProtocolAdapterBuilder::new(store)
1339 }
1340}
1341
1342impl<'s, S: Store, R, W> DaemonProtocolAdapter<'s, S, R, W>
1343where
1344 R: AsyncReadExt + Unpin + Send + Debug,
1345 W: AsyncWriteExt + Unpin + Send + Debug,
1346{
1347 #[instrument(skip(r, w, store))]
1348 async fn handshake(
1349 mut r: R,
1350 mut w: W,
1351 store: &'s mut S,
1352 nix_version: String,
1353 remote_trust: Option<bool>,
1354 ) -> Result<Self> {
1355 match wire::read_u64(&mut r).await {
1357 Ok(magic1 @ wire::WORKER_MAGIC_1) => Ok(magic1),
1358 Ok(v) => Err(Error::Invalid(format!("{:#x}", v))),
1359 Err(err) => Err(err.into()),
1360 }
1361 .with_field("magic1")?;
1362 wire::write_u64(&mut w, wire::WORKER_MAGIC_2)
1363 .await
1364 .with_field("magic2")?;
1365
1366 wire::write_proto(&mut w, MAX_PROTO)
1368 .await
1369 .with_field("daemon_proto")?;
1370 let proto = wire::read_proto(&mut r)
1371 .await
1372 .and_then(|proto| {
1373 if proto.0 != 1 || proto < MIN_PROTO {
1374 return Err(Error::Invalid(format!("{}", proto)));
1375 }
1376 Ok(proto)
1377 })
1378 .with_field("client_proto")?;
1379
1380 if proto >= Proto(1, 14) {
1382 wire::read_u64(&mut r)
1383 .await
1384 .with_field("__obsolete_cpu_affinity")?;
1385 }
1386 if proto >= Proto(1, 11) {
1387 wire::read_bool(&mut r)
1388 .await
1389 .with_field("__obsolete_reserve_space")?;
1390 }
1391
1392 if proto >= Proto(1, 33) {
1394 wire::write_string(&mut w, &nix_version)
1395 .await
1396 .with_field("nix_version")?;
1397 }
1398 if proto >= Proto(1, 35) {
1399 wire::write_u64(
1400 &mut w,
1401 match remote_trust {
1402 None => 0,
1403 Some(true) => 1,
1404 Some(false) => 2,
1405 },
1406 )
1407 .await
1408 .with_field("remote_trust")?;
1409 }
1410
1411 wire::write_stderr(&mut w, None)
1413 .await
1414 .with_field("stderr")?;
1415 Ok(Self { r, w, store, proto })
1416 }
1417
1418 pub async fn run(&mut self) -> Result<(), S::Error> {
1420 loop {
1421 match wire::read_op(&mut self.r).await {
1422 Ok(wire::Op::IsValidPath) => {
1423 let path = wire::read_string(&mut self.r)
1424 .await
1425 .with_field("IsValidPath.path")?;
1426
1427 let is_valid =
1428 forward_stderr(&mut self.w, self.store.is_valid_path(path)).await?;
1429 wire::write_bool(&mut self.w, is_valid)
1430 .await
1431 .with_field("IsValidPath.is_valid")?;
1432 }
1433 Ok(wire::Op::HasSubstitutes) => {
1434 let path = wire::read_string(&mut self.r)
1435 .await
1436 .with_field("HasSubstitutes.path")?;
1437 let has_substitutes =
1438 forward_stderr(&mut self.w, self.store.has_substitutes(path)).await?;
1439 wire::write_bool(&mut self.w, has_substitutes)
1440 .await
1441 .with_field("HasSubstitutes.has_substitutes")?;
1442 }
1443 Ok(wire::Op::AddToStore) => match self.proto {
1444 Proto(1, 25..) => {
1445 let name = wire::read_string(&mut self.r)
1446 .await
1447 .with_field("AddToStore.name")?;
1448 let cam_str = wire::read_string(&mut self.r)
1449 .await
1450 .with_field("AddToStore.camStr")?;
1451 let refs = wire::read_strings(&mut self.r)
1452 .collect::<Result<Vec<_>>>()
1453 .await
1454 .with_field("AddToStore.refs")?;
1455 let repair = wire::read_bool(&mut self.r)
1456 .await
1457 .with_field("AddToStore.repair")?;
1458 let source = wire::FramedReader::new(&mut self.r);
1459
1460 let (name, pi) = forward_stderr(
1461 &mut self.w,
1462 self.store.add_to_store(name, cam_str, refs, repair, source),
1463 )
1464 .await?;
1465 wire::write_string(&mut self.w, name)
1466 .await
1467 .with_field("AddToStore.name")?;
1468 wire::write_pathinfo(&mut self.w, self.proto, &pi)
1469 .await
1470 .with_field("AddToStore.pi")?;
1471 }
1472 _ => {
1473 return Err(Error::Invalid(format!(
1474 "AddToStore is not implemented for Protocol {}",
1475 self.proto
1476 ))
1477 .into())
1478 }
1479 },
1480 Ok(wire::Op::BuildPaths) => {
1481 let paths = wire::read_strings(&mut self.r)
1482 .collect::<Result<Vec<_>>>()
1483 .await
1484 .with_field("BuildPaths.paths")?;
1485 let mode = if self.proto >= Proto(1, 15) {
1486 wire::read_build_mode(&mut self.r)
1487 .await
1488 .with_field("BuildPaths.build_mode")?
1489 } else {
1490 BuildMode::Normal
1491 };
1492
1493 forward_stderr(&mut self.w, self.store.build_paths(paths, mode)).await?;
1494 wire::write_u64(&mut self.w, 1)
1495 .await
1496 .with_field("BuildPaths.__unused__")?;
1497 }
1498 Ok(wire::Op::EnsurePath) => {
1499 let path = wire::read_string(&mut self.r)
1500 .await
1501 .with_field("EnsurePath.path")?;
1502 forward_stderr(&mut self.w, self.store.ensure_path(path)).await?;
1503 wire::write_u64(&mut self.w, 1)
1504 .await
1505 .with_field("EnsurePath.__unused__")?;
1506 }
1507 Ok(wire::Op::AddTempRoot) => {
1508 let path = wire::read_string(&mut self.r)
1509 .await
1510 .with_field("AddTempRoot.path")?;
1511
1512 forward_stderr(&mut self.w, self.store.add_temp_root(path)).await?;
1513 wire::write_u64(&mut self.w, 1)
1514 .await
1515 .with_field("AddTempRoot.__unused__")?;
1516 }
1517 Ok(wire::Op::AddIndirectRoot) => {
1518 let path = wire::read_string(&mut self.r)
1519 .await
1520 .with_field("AddIndirectRoot.path")?;
1521
1522 forward_stderr(&mut self.w, self.store.add_indirect_root(path)).await?;
1523 wire::write_u64(&mut self.w, 1)
1524 .await
1525 .with_field("AddIndirectRoot.__unused__")?;
1526 }
1527 Ok(wire::Op::FindRoots) => {
1528 let roots = forward_stderr(&mut self.w, self.store.find_roots()).await?;
1529
1530 wire::write_u64(&mut self.w, roots.len() as u64)
1531 .await
1532 .with_field("FindRoots.roots[].<count>")?;
1533 for (link, target) in roots {
1534 wire::write_string(&mut self.w, link)
1535 .await
1536 .with_field("FindRoots.roots[].link")?;
1537 wire::write_string(&mut self.w, target)
1538 .await
1539 .with_field("FindRoots.roots[].target")?;
1540 }
1541 }
1542 Ok(wire::Op::SetOptions) => {
1543 let ops = wire::read_client_settings(&mut self.r, self.proto)
1544 .await
1545 .with_field("SetOptions.clientSettings")?;
1546 forward_stderr(&mut self.w, self.store.set_options(ops)).await?;
1547 }
1548 Ok(wire::Op::QueryPathInfo) => {
1549 let path = wire::read_string(&mut self.r)
1550 .await
1551 .with_field("QueryPathInfo.path")?;
1552
1553 let pi = forward_stderr(&mut self.w, self.store.query_pathinfo(path)).await?;
1554
1555 wire::write_bool(&mut self.w, pi.is_some())
1556 .await
1557 .with_field("QueryPathInfo.is_valid")?;
1558 if let Some(pi) = pi {
1559 wire::write_pathinfo(&mut self.w, self.proto, &pi)
1560 .await
1561 .with_field("QueryPathInfo.path_info")?;
1562 }
1563 }
1564 Ok(wire::Op::QueryValidPaths) => match self.proto {
1565 Proto(1, 12..) => {
1566 let paths = wire::read_strings(&mut self.r)
1567 .collect::<Result<Vec<_>>>()
1568 .await
1569 .with_field("QueryValidPaths.path")?;
1570 let use_substituters = if self.proto >= Proto(1, 27) {
1571 wire::read_bool(&mut self.r)
1572 .await
1573 .with_field("QueryValidPaths.use_substituters")?
1574 } else {
1575 true
1576 };
1577
1578 let valid_paths = forward_stderr(
1579 &mut self.w,
1580 self.store.query_valid_paths(paths, use_substituters),
1581 )
1582 .await?;
1583
1584 wire::write_strings(&mut self.w, valid_paths)
1585 .await
1586 .with_field("QueryValidPaths.valid_path")?;
1587 }
1588 _ => {
1589 return Err(Error::Invalid(format!(
1590 "QueryValidPaths is not implemented for Protocol {}",
1591 self.proto
1592 ))
1593 .into())
1594 }
1595 },
1596 Ok(wire::Op::QuerySubstitutablePaths) => {
1597 let paths = wire::read_strings(&mut self.r)
1598 .collect::<Result<Vec<_>>>()
1599 .await
1600 .with_field("QuerySubstitutablePaths.paths")?;
1601 let sub_paths =
1602 forward_stderr(&mut self.w, self.store.query_substitutable_paths(paths))
1603 .await?;
1604 wire::write_strings(&mut self.w, sub_paths)
1605 .await
1606 .with_field("QuerySubstitutablePaths.sub_paths")?;
1607 }
1608 Ok(wire::Op::QueryMissing) => {
1609 let paths = wire::read_strings(&mut self.r)
1610 .collect::<Result<Vec<_>>>()
1611 .await
1612 .with_field("QueryMissing.paths")?;
1613
1614 let crate::Missing {
1615 will_build,
1616 will_substitute,
1617 unknown,
1618 download_size,
1619 nar_size,
1620 } = forward_stderr(&mut self.w, self.store.query_missing(paths)).await?;
1621
1622 wire::write_strings(&mut self.w, will_build)
1623 .await
1624 .with_field("QueryMissing.will_build")?;
1625 wire::write_strings(&mut self.w, will_substitute)
1626 .await
1627 .with_field("QueryMissing.will_substitute")?;
1628 wire::write_strings(&mut self.w, unknown)
1629 .await
1630 .with_field("QueryMissing.unknown")?;
1631 wire::write_u64(&mut self.w, download_size)
1632 .await
1633 .with_field("QueryMissing.download_size")?;
1634 wire::write_u64(&mut self.w, nar_size)
1635 .await
1636 .with_field("QueryMissing.nar_size")?;
1637 }
1638 Ok(wire::Op::QueryValidDerivers) => {
1639 let path = wire::read_string(&mut self.r)
1640 .await
1641 .with_field("QueryValidDerivers.path")?;
1642
1643 let derivers =
1644 forward_stderr(&mut self.w, self.store.query_valid_derivers(path)).await?;
1645 wire::write_strings(&mut self.w, derivers)
1646 .await
1647 .with_field("QueryValidDerivers.paths")?
1648 }
1649 Ok(wire::Op::QueryDerivationOutputMap) => {
1650 let path = wire::read_string(&mut self.r)
1651 .await
1652 .with_field("QueryDerivationOutputMap.paths")?;
1653
1654 let outputs =
1655 forward_stderr(&mut self.w, self.store.query_derivation_output_map(path))
1656 .await?;
1657 wire::write_u64(&mut self.w, outputs.len() as u64)
1658 .await
1659 .with_field("QueryDerivationOutputMap.outputs[].<count>")?;
1660 for (name, path) in outputs {
1661 wire::write_string(&mut self.w, name)
1662 .await
1663 .with_field("QueryDerivationOutputMap.outputs[].name")?;
1664 wire::write_string(&mut self.w, path)
1665 .await
1666 .with_field("QueryDerivationOutputMap.outputs[].path")?;
1667 }
1668 }
1669 Ok(wire::Op::BuildPathsWithResults) => {
1670 let paths = wire::read_strings(&mut self.r)
1671 .collect::<Result<Vec<_>>>()
1672 .await
1673 .with_field("BuildPathsWithResults.paths")?;
1674 let mode = wire::read_build_mode(&mut self.r)
1675 .await
1676 .with_field("BuildPathsWithResults.build_mode")?;
1677
1678 let results = forward_stderr(
1679 &mut self.w,
1680 self.store.build_paths_with_results(paths, mode),
1681 )
1682 .await?;
1683
1684 wire::write_u64(&mut self.w, results.len() as u64)
1685 .await
1686 .with_field("BuildPathsWithResults.results.<count>")?;
1687 for (path, result) in results {
1688 wire::write_string(&mut self.w, path)
1689 .await
1690 .with_field("BuildPathsWithResults.results[].path")?;
1691 wire::write_build_result(&mut self.w, &result, self.proto)
1692 .await
1693 .with_field("BuildPathsWithResults.results[].result")?;
1694 }
1695 }
1696 Ok(v) => todo!("{:#?}", v),
1697
1698 Err(Error::IO(err)) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
1699 info!("Client disconnected");
1700 return Ok(());
1701 }
1702 Err(err) => return Err(err.into()),
1703 }
1704 }
1705 }
1706}
1707
1708async fn forward_stderr<W: AsyncWriteExt + Unpin, P: Progress>(
1709 w: &mut W,
1710 mut prog: P,
1711) -> Result<P::T, P::Error> {
1712 while let Some(stderr) = prog.next().await? {
1713 wire::write_stderr(w, Some(stderr)).await?;
1714 }
1715 wire::write_stderr(w, None).await?;
1716 prog.result().await
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721 use super::*;
1722
1723 #[test]
1725 fn test_version_ord() {
1726 assert!(Proto(0, 1) > Proto(0, 0));
1727 assert!(Proto(1, 0) > Proto(0, 0));
1728 assert!(Proto(1, 0) > Proto(0, 1));
1729 assert!(Proto(1, 1) > Proto(1, 0));
1730 }
1731}