nix_daemon/nix/
mod.rs

1// SPDX-FileCopyrightText: 2024 embr <git@liclac.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5//! Interfaces to nix-daemon (or compatible) Stores.
6//! ------------------------------------------------
7//!
8//! This module currently implements support for Protocol 1.35, and Nix 2.15+.
9//!
10//! Support for older versions will be added in the future - in particular, Protocol 1.21
11//! used by Nix 2.3.
12
13pub mod wire;
14
15use crate::{
16    BuildMode, ClientSettings, Error, PathInfo, Progress, Result, ResultExt, Stderr, Store,
17};
18use std::future::Future;
19use std::{collections::HashMap, fmt::Debug};
20use tokio::{
21    io::{AsyncReadExt, AsyncWriteExt},
22    net::UnixStream,
23};
24use tokio_stream::StreamExt;
25use tracing::{info, instrument};
26
27/// Minimum supported protocol version. Older versions will be rejected.
28///
29/// Protocol 1.35 was introduced in Nix 2.15:
30/// https://github.com/NixOS/nix/blob/2.15.0/src/libstore/worker-protocol.hh#L13
31///
32/// TODO: Support Protocol 1.21, used by Nix 2.3.
33const MIN_PROTO: Proto = Proto(1, 35); // Nix >= 2.15.x
34
35/// Maxmimum supported protocol version. Newer daemons will run in compatibility mode.
36///
37/// Protocol 1.35 is current as of Nix 2.19:
38/// https://github.com/NixOS/nix/blob/2.19.3/src/libstore/worker-protocol.hh#L12
39const MAX_PROTO: Proto = Proto(1, 35); // Nix <= 2.19.x
40
41/// Protocol version.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
43pub struct Proto(u8, u8);
44
45impl From<u64> for Proto {
46    fn from(raw: u64) -> Self {
47        Self(((raw & 0xFF00) >> 8) as u8, (raw & 0x00FF) as u8)
48    }
49}
50impl From<Proto> for u64 {
51    fn from(v: Proto) -> Self {
52        ((v.0 as u64) << 8) | (v.1 as u64)
53    }
54}
55
56impl std::fmt::Display for Proto {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        write!(f, "{}.{}", self.0, self.1)
59    }
60}
61
62impl Proto {
63    fn since(&self, v: u8) -> bool {
64        self.1 >= v
65    }
66}
67
68trait DaemonProgressCaller {
69    fn call<
70        E: From<Error> + From<std::io::Error> + Send + Sync,
71        C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
72    >(
73        self,
74        store: &mut DaemonStore<C>,
75    ) -> impl Future<Output = Result<(), E>> + Send;
76}
77
78trait DaemonProgressReturner {
79    type T: Send;
80    fn result<
81        E: From<Error> + From<std::io::Error> + Send + Sync,
82        C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
83    >(
84        self,
85        store: &mut DaemonStore<C>,
86    ) -> impl Future<Output = Result<Self::T, E>> + Send;
87}
88
89/// Internal [`crate::Progress`] implementation used by [`DaemonStore`].
90struct DaemonProgress<'s, PC, PR, T: Send, C>
91where
92    C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
93    PC: DaemonProgressCaller + Send,
94    PR: DaemonProgressReturner<T = T> + Send,
95{
96    store: &'s mut DaemonStore<C>,
97    fuse: bool,
98    caller: Option<PC>,
99    returner: PR,
100}
101impl<'s, PC, PR, T: Send, C> DaemonProgress<'s, PC, PR, T, C>
102where
103    C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
104    PC: DaemonProgressCaller + Send,
105    PR: DaemonProgressReturner<T = T> + Send,
106{
107    fn new(store: &'s mut DaemonStore<C>, caller: PC, returner: PR) -> Self {
108        Self {
109            store,
110            fuse: false,
111            caller: Some(caller),
112            returner,
113        }
114    }
115}
116impl<'s, PC, PR, T: Send, C> Progress for DaemonProgress<'s, PC, PR, T, C>
117where
118    C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
119    PC: DaemonProgressCaller + Send,
120    PR: DaemonProgressReturner<T = T> + Send,
121{
122    type T = T;
123    type Error = Error;
124
125    async fn next(&mut self) -> Result<Option<Stderr>> {
126        let store = &mut self.store;
127        if let Some(caller) = self.caller.take() {
128            caller.call::<Error, C>(store).await?;
129        }
130        if self.fuse {
131            Ok(None)
132        } else {
133            match wire::read_stderr(&mut store.conn).await? {
134                Some(Stderr::Error(err)) => Err(Error::NixError(err)),
135                Some(stderr) => Ok(Some(stderr)),
136                None => {
137                    self.fuse = true;
138                    Ok(None)
139                }
140            }
141        }
142    }
143
144    async fn result(mut self) -> Result<Self::T> {
145        while let Some(_) = self.next().await? {}
146        self.returner.result(self.store).await
147    }
148}
149
150#[derive(Debug, Default)]
151pub struct DaemonStoreBuilder {
152    // This will do things in the future.
153}
154
155impl DaemonStoreBuilder {
156    /// Initializes a [`DaemonStore`] by adopting a connection.
157    ///
158    /// It's up to the caller that the connection is in a state to begin a nix handshake, eg.
159    /// it behaves like a fresh connection to the daemon socket - if this is a connection through
160    /// a proxy, any proxy handshakes should already have taken place, etc.
161    ///
162    /// ```no_run
163    /// use tokio::net::UnixStream;
164    /// use nix_daemon::nix::DaemonStore;
165    ///
166    /// # async {
167    /// let conn = UnixStream::connect("/nix/var/nix/daemon-socket/socket").await?;
168    /// let store = DaemonStore::builder().init(conn).await?;
169    /// # Ok::<_, nix_daemon::Error>(())
170    /// # };
171    /// ```
172    pub async fn init<C: AsyncReadExt + AsyncWriteExt + Unpin>(
173        self,
174        conn: C,
175    ) -> Result<DaemonStore<C>> {
176        let mut store = DaemonStore {
177            conn,
178            buffer: [0u8; 1024],
179            proto: Proto(0, 0),
180        };
181        store.handshake().await?;
182        Ok(store)
183    }
184
185    /// Connects to a Nix daemon via a unix socket.
186    /// The path is usually `/nix/var/nix/daemon-socket/socket`.
187    ///
188    /// ```no_run
189    /// use nix_daemon::{Store, Progress, nix::DaemonStore};
190    ///
191    /// # async {
192    /// let store = DaemonStore::builder()
193    ///     .connect_unix("/nix/var/nix/daemon-socket/socket")
194    ///     .await?;
195    /// # Ok::<_, nix_daemon::Error>(())
196    /// # };
197    /// ```
198    pub async fn connect_unix<P: AsRef<std::path::Path>>(
199        self,
200        path: P,
201    ) -> Result<DaemonStore<UnixStream>> {
202        self.init(UnixStream::connect(path).await?).await
203    }
204}
205
206/// Store backed by a `nix-daemon` (or compatible store). Implements [`crate::Store`].
207///
208/// ```no_run
209/// use nix_daemon::{Store, Progress, nix::DaemonStore};
210///
211/// # async {
212/// let mut store = DaemonStore::builder()
213///     .connect_unix("/nix/var/nix/daemon-socket/socket")
214///     .await?;
215///
216/// let is_valid_path = store.is_valid_path("/nix/store/...").result().await?;
217/// # Ok::<_, nix_daemon::Error>(())
218/// # };
219/// ```
220#[derive(Debug)]
221pub struct DaemonStore<C: AsyncReadExt + AsyncWriteExt + Unpin> {
222    conn: C,
223    buffer: [u8; 1024],
224    /// Negotiated protocol version.
225    pub proto: Proto,
226}
227
228impl DaemonStore<UnixStream> {
229    /// Returns a Builder.
230    pub fn builder() -> DaemonStoreBuilder {
231        DaemonStoreBuilder::default()
232    }
233}
234
235impl<C: AsyncReadExt + AsyncWriteExt + Unpin> DaemonStore<C> {
236    #[instrument(skip(self))]
237    async fn handshake(&mut self) -> Result<()> {
238        // Exchange magic numbers.
239        wire::write_u64(&mut self.conn, wire::WORKER_MAGIC_1)
240            .await
241            .with_field("magic1")?;
242        match wire::read_u64(&mut self.conn).await {
243            Ok(magic2 @ wire::WORKER_MAGIC_2) => Ok(magic2),
244            Ok(v) => Err(Error::Invalid(format!("{:#x}", v))),
245            Err(err) => Err(err.into()),
246        }
247        .with_field("magic2")?;
248
249        // Check that we're talking to a new enough daemon, tell them our version.
250        self.proto = wire::read_proto(&mut self.conn)
251            .await
252            .and_then(|proto| {
253                if proto.0 != 1 || proto < MIN_PROTO {
254                    return Err(Error::Invalid(format!("{}", proto)));
255                }
256                Ok(proto)
257            })
258            .with_field("daemon_proto")?;
259        wire::write_proto(&mut self.conn, MAX_PROTO)
260            .await
261            .with_field("client_proto")?;
262
263        // Write some obsolete fields.
264        if self.proto >= Proto(1, 14) {
265            wire::write_u64(&mut self.conn, 0)
266                .await
267                .with_field("__obsolete_cpu_affinity")?;
268        }
269        if self.proto >= Proto(1, 11) {
270            wire::write_bool(&mut self.conn, false)
271                .await
272                .with_field("__obsolete_reserve_space")?;
273        }
274
275        // And we don't currently do anything with these.
276        if self.proto >= Proto(1, 33) {
277            wire::read_string(&mut self.conn)
278                .await
279                .with_field("nix_version")?;
280        }
281        if self.proto >= Proto(1, 35) {
282            // Option<bool>: 0 = None, 1 = Some(true), 2 = Some(false)
283            wire::read_u64(&mut self.conn)
284                .await
285                .with_field("remote_trust")?;
286        }
287
288        // Discard Stderr. There shouldn't be anything here anyway.
289        while let Some(_) = wire::read_stderr(&mut self.conn).await? {}
290        Ok(())
291    }
292}
293
294impl<C: AsyncReadExt + AsyncWriteExt + Unpin + Send> Store for DaemonStore<C> {
295    type Error = Error;
296
297    #[instrument(skip(self))]
298    fn is_valid_path<S: AsRef<str> + Send + Sync + Debug>(
299        &mut self,
300        path: S,
301    ) -> impl Progress<T = bool, Error = Self::Error> {
302        // We have to do this silly verbose thing, because using two closures and passing
303        // &mut store to both, summons incomprehensible horrors into your lifetimes that
304        // have already taken days of my life.
305        //
306        // Also I don't think I can match all the type constraints some of the later ops
307        // (AddToStore, I'm looking at you) with `macro_rules` in a way that lets me
308        // synthesize these, but this is quite likely a skill issue.
309        //
310        // If you can figure it out, please, share with me your wisdom.
311        struct Caller<S: AsRef<str> + Send + Sync + Debug> {
312            path: S,
313        }
314        impl<S: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<S> {
315            async fn call<
316                E: From<Error> + From<std::io::Error> + Send + Sync,
317                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
318            >(
319                self,
320                store: &mut DaemonStore<C>,
321            ) -> Result<(), E> {
322                wire::write_op(&mut store.conn, wire::Op::IsValidPath)
323                    .await
324                    .with_field("IsValidPath.<op>")?;
325                wire::write_string(&mut store.conn, &self.path)
326                    .await
327                    .with_field("IsValidPath.path")?;
328                Ok(())
329            }
330        }
331
332        struct Returner;
333        impl DaemonProgressReturner for Returner {
334            type T = bool;
335            async fn result<
336                E: From<Error> + From<std::io::Error> + Send + Sync,
337                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
338            >(
339                self,
340                store: &mut DaemonStore<C>,
341            ) -> Result<Self::T, E> {
342                Ok(wire::read_bool(&mut store.conn).await?)
343            }
344        }
345
346        DaemonProgress::new(self, Caller { path }, Returner)
347    }
348
349    #[instrument(skip(self))]
350    fn has_substitutes<P: AsRef<str> + Send + Sync + Debug>(
351        &mut self,
352        path: P,
353    ) -> impl Progress<T = bool, Error = Self::Error> {
354        struct Caller<P: AsRef<str> + Send + Sync + Debug> {
355            path: P,
356        }
357        impl<P: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<P> {
358            async fn call<
359                E: From<Error> + From<std::io::Error> + Send + Sync,
360                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
361            >(
362                self,
363                store: &mut DaemonStore<C>,
364            ) -> Result<(), E> {
365                wire::write_op(&mut store.conn, wire::Op::HasSubstitutes)
366                    .await
367                    .with_field("HasSubstitutes.<op>")?;
368                wire::write_string(&mut store.conn, &self.path)
369                    .await
370                    .with_field("HasSubstitutes.path")?;
371                Ok(())
372            }
373        }
374
375        struct Returner;
376        impl DaemonProgressReturner for Returner {
377            type T = bool;
378            async fn result<
379                E: From<Error> + From<std::io::Error> + Send + Sync,
380                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
381            >(
382                self,
383                store: &mut DaemonStore<C>,
384            ) -> Result<Self::T, E> {
385                Ok(wire::read_bool(&mut store.conn).await?)
386            }
387        }
388
389        DaemonProgress::new(self, Caller { path }, Returner)
390    }
391
392    #[instrument(skip(self, source))]
393    fn add_to_store<
394        SN: AsRef<str> + Send + Sync + Debug,
395        SC: AsRef<str> + Send + Sync + Debug,
396        Refs,
397        R,
398    >(
399        &mut self,
400        name: SN,
401        cam_str: SC,
402        refs: Refs,
403        repair: bool,
404        source: R,
405    ) -> impl Progress<T = (String, PathInfo), Error = Self::Error>
406    where
407        Refs: IntoIterator + Send + Debug,
408        Refs::IntoIter: ExactSizeIterator + Send,
409        Refs::Item: AsRef<str> + Send + Sync,
410        R: AsyncReadExt + Unpin + Send + Debug,
411    {
412        struct Caller<
413            SN: AsRef<str> + Send + Sync + Debug,
414            SC: AsRef<str> + Send + Sync + Debug,
415            Refs,
416            R,
417        >
418        where
419            Refs: IntoIterator + Send + Debug,
420            Refs::IntoIter: ExactSizeIterator + Send,
421            Refs::Item: AsRef<str> + Send + Sync,
422            R: AsyncReadExt + Unpin + Send + Debug,
423        {
424            name: SN,
425            cam_str: SC,
426            refs: Refs,
427            repair: bool,
428            source: R,
429        }
430        impl<
431                SN: AsRef<str> + Send + Sync + Debug,
432                SC: AsRef<str> + Send + Sync + Debug,
433                Refs,
434                R,
435            > DaemonProgressCaller for Caller<SN, SC, Refs, R>
436        where
437            Refs: IntoIterator + Send + Debug,
438            Refs::IntoIter: ExactSizeIterator + Send,
439            Refs::Item: AsRef<str> + Send + Sync,
440            R: AsyncReadExt + Unpin + Send + Debug,
441        {
442            async fn call<
443                E: From<Error> + From<std::io::Error> + Send + Sync,
444                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
445            >(
446                mut self,
447                store: &mut DaemonStore<C>,
448            ) -> Result<(), E> {
449                match store.proto {
450                    Proto(1, 25..) => {
451                        wire::write_op(&mut store.conn, wire::Op::AddToStore)
452                            .await
453                            .with_field("AddToStore.<op>")?;
454                        wire::write_string(&mut store.conn, self.name)
455                            .await
456                            .with_field("AddToStore.name")?;
457                        wire::write_string(&mut store.conn, self.cam_str)
458                            .await
459                            .with_field("AddToStore.camStr")?;
460                        wire::write_strings(&mut store.conn, self.refs)
461                            .await
462                            .with_field("AddToStore.refs")?;
463                        wire::write_bool(&mut store.conn, self.repair)
464                            .await
465                            .with_field("AddToStore.repair")?;
466                        wire::copy_to_framed(&mut self.source, &mut store.conn, &mut store.buffer)
467                            .await
468                            .with_field("AddToStore.<source>")?;
469                        Ok(())
470                    }
471                    proto => Err(Error::Invalid(format!(
472                        "AddToStore is not implemented for Protocol {}",
473                        proto
474                    ))
475                    .into()),
476                }
477            }
478        }
479
480        struct Returner;
481        impl DaemonProgressReturner for Returner {
482            type T = (String, PathInfo);
483            async fn result<
484                E: From<Error> + From<std::io::Error> + Send + Sync,
485                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
486            >(
487                self,
488                store: &mut DaemonStore<C>,
489            ) -> Result<Self::T, E> {
490                Ok((
491                    wire::read_string(&mut store.conn)
492                        .await
493                        .with_field("name")?,
494                    wire::read_pathinfo(&mut store.conn, store.proto)
495                        .await
496                        .with_field("PathInfo")?,
497                ))
498            }
499        }
500
501        DaemonProgress::new(
502            self,
503            Caller {
504                name,
505                cam_str,
506                refs,
507                repair,
508                source,
509            },
510            Returner,
511        )
512    }
513
514    #[instrument(skip(self))]
515    fn build_paths<Paths>(
516        &mut self,
517        paths: Paths,
518        mode: BuildMode,
519    ) -> impl Progress<T = (), Error = Self::Error>
520    where
521        Paths: IntoIterator + Send + Debug,
522        Paths::IntoIter: ExactSizeIterator + Send,
523        Paths::Item: AsRef<str> + Send + Sync,
524    {
525        struct Caller<Paths>
526        where
527            Paths: IntoIterator + Send + Debug,
528            Paths::IntoIter: ExactSizeIterator + Send,
529            Paths::Item: AsRef<str> + Send + Sync,
530        {
531            paths: Paths,
532            mode: BuildMode,
533        }
534        impl<Paths> DaemonProgressCaller for Caller<Paths>
535        where
536            Paths: IntoIterator + Send + Debug,
537            Paths::IntoIter: ExactSizeIterator + Send,
538            Paths::Item: AsRef<str> + Send + Sync,
539        {
540            async fn call<
541                E: From<Error> + From<std::io::Error> + Send + Sync,
542                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
543            >(
544                self,
545                store: &mut DaemonStore<C>,
546            ) -> Result<(), E> {
547                wire::write_op(&mut store.conn, wire::Op::BuildPaths)
548                    .await
549                    .with_field("BuildPaths.<op>")?;
550                wire::write_strings(&mut store.conn, self.paths)
551                    .await
552                    .with_field("BuildPaths.paths")?;
553                if store.proto >= Proto(1, 15) {
554                    wire::write_build_mode(&mut store.conn, self.mode)
555                        .await
556                        .with_field("BuildPaths.build_mode")?;
557                }
558                Ok(())
559            }
560        }
561
562        struct Returner;
563        impl DaemonProgressReturner for Returner {
564            type T = ();
565            async fn result<
566                E: From<Error> + From<std::io::Error> + Send + Sync,
567                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
568            >(
569                self,
570                store: &mut DaemonStore<C>,
571            ) -> Result<Self::T, E> {
572                wire::read_u64(&mut store.conn)
573                    .await
574                    .with_field("__unused__")?;
575                Ok(())
576            }
577        }
578
579        DaemonProgress::new(self, Caller { paths, mode }, Returner)
580    }
581
582    #[instrument(skip(self))]
583    fn ensure_path<Path: AsRef<str> + Send + Sync + Debug>(
584        &mut self,
585        path: Path,
586    ) -> impl Progress<T = (), Error = Self::Error> {
587        struct Caller<Path: AsRef<str> + Send + Sync + Debug> {
588            path: Path,
589        }
590        impl<Path: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<Path> {
591            async fn call<
592                E: From<Error> + From<std::io::Error> + Send + Sync,
593                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
594            >(
595                self,
596                store: &mut DaemonStore<C>,
597            ) -> Result<(), E> {
598                wire::write_op(&mut store.conn, wire::Op::EnsurePath)
599                    .await
600                    .with_field("EnsurePath.<op>")?;
601                wire::write_string(&mut store.conn, self.path)
602                    .await
603                    .with_field("EnsurePath.path")?;
604                Ok(())
605            }
606        }
607
608        struct Returner;
609        impl DaemonProgressReturner for Returner {
610            type T = ();
611            async fn result<
612                E: From<Error> + From<std::io::Error> + Send + Sync,
613                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
614            >(
615                self,
616                store: &mut DaemonStore<C>,
617            ) -> Result<Self::T, E> {
618                wire::read_u64(&mut store.conn)
619                    .await
620                    .with_field("__unused__")?;
621                Ok(())
622            }
623        }
624
625        DaemonProgress::new(self, Caller { path }, Returner)
626    }
627
628    #[instrument(skip(self))]
629    fn add_temp_root<Path: AsRef<str> + Send + Sync + Debug>(
630        &mut self,
631        path: Path,
632    ) -> impl Progress<T = (), Error = Self::Error> {
633        struct Caller<Path: AsRef<str> + Send + Sync + Debug> {
634            path: Path,
635        }
636        impl<Path: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<Path> {
637            async fn call<
638                E: From<Error> + From<std::io::Error> + Send + Sync,
639                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
640            >(
641                self,
642                store: &mut DaemonStore<C>,
643            ) -> Result<(), E> {
644                wire::write_op(&mut store.conn, wire::Op::AddTempRoot)
645                    .await
646                    .with_field("AddTempRoot.<op>")?;
647                wire::write_string(&mut store.conn, self.path)
648                    .await
649                    .with_field("AddTempRoot.path")?;
650                Ok(())
651            }
652        }
653
654        struct Returner;
655        impl DaemonProgressReturner for Returner {
656            type T = ();
657            async fn result<
658                E: From<Error> + From<std::io::Error> + Send + Sync,
659                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
660            >(
661                self,
662                store: &mut DaemonStore<C>,
663            ) -> Result<Self::T, E> {
664                wire::read_u64(&mut store.conn)
665                    .await
666                    .with_field("__unused__")?;
667                Ok(())
668            }
669        }
670
671        DaemonProgress::new(self, Caller { path }, Returner)
672    }
673
674    #[instrument(skip(self))]
675    fn add_indirect_root<Path: AsRef<str> + Send + Sync + Debug>(
676        &mut self,
677        path: Path,
678    ) -> impl Progress<T = (), Error = Self::Error> {
679        struct Caller<Path: AsRef<str> + Send + Sync + Debug> {
680            path: Path,
681        }
682        impl<Path: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<Path> {
683            async fn call<
684                E: From<Error> + From<std::io::Error> + Send + Sync,
685                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
686            >(
687                self,
688                store: &mut DaemonStore<C>,
689            ) -> Result<(), E> {
690                wire::write_op(&mut store.conn, wire::Op::AddIndirectRoot)
691                    .await
692                    .with_field("AddIndirectRoot.<op>")?;
693                wire::write_string(&mut store.conn, self.path)
694                    .await
695                    .with_field("AddIndirectRoot.path")?;
696                Ok(())
697            }
698        }
699
700        struct Returner;
701        impl DaemonProgressReturner for Returner {
702            type T = ();
703            async fn result<
704                E: From<Error> + From<std::io::Error> + Send + Sync,
705                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
706            >(
707                self,
708                store: &mut DaemonStore<C>,
709            ) -> Result<Self::T, E> {
710                wire::read_u64(&mut store.conn)
711                    .await
712                    .with_field("__unused__")?;
713                Ok(())
714            }
715        }
716
717        DaemonProgress::new(self, Caller { path }, Returner)
718    }
719
720    #[instrument(skip(self))]
721    fn find_roots(&mut self) -> impl Progress<T = HashMap<String, String>, Error = Self::Error> {
722        struct Caller;
723        impl DaemonProgressCaller for Caller {
724            async fn call<
725                E: From<Error> + From<std::io::Error> + Send + Sync,
726                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
727            >(
728                self,
729                store: &mut DaemonStore<C>,
730            ) -> Result<(), E> {
731                wire::write_op(&mut store.conn, wire::Op::FindRoots)
732                    .await
733                    .with_field("FindRoots.<op>")?;
734                Ok(())
735            }
736        }
737
738        struct Returner;
739        impl DaemonProgressReturner for Returner {
740            type T = HashMap<String, String>;
741            async fn result<
742                E: From<Error> + From<std::io::Error> + Send + Sync,
743                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
744            >(
745                self,
746                store: &mut DaemonStore<C>,
747            ) -> Result<Self::T, E> {
748                let count = wire::read_u64(&mut store.conn)
749                    .await
750                    .with_field("FindRoots.roots[].<count>")?;
751                let mut roots = HashMap::with_capacity(count as usize);
752                for _ in 0..count {
753                    let link = wire::read_string(&mut store.conn)
754                        .await
755                        .with_field("FindRoots.roots[].link")?;
756                    let target = wire::read_string(&mut store.conn)
757                        .await
758                        .with_field("FindRoots.roots[].target")?;
759                    roots.insert(link, target);
760                }
761                Ok(roots)
762            }
763        }
764
765        DaemonProgress::new(self, Caller, Returner)
766    }
767
768    #[instrument(skip(self))]
769    fn set_options(&mut self, opts: ClientSettings) -> impl Progress<T = (), Error = Self::Error> {
770        struct Caller {
771            opts: ClientSettings,
772        }
773        impl DaemonProgressCaller for Caller {
774            async fn call<
775                E: From<Error> + From<std::io::Error> + Send + Sync,
776                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
777            >(
778                self,
779                store: &mut DaemonStore<C>,
780            ) -> Result<(), E> {
781                wire::write_op(&mut store.conn, wire::Op::SetOptions)
782                    .await
783                    .with_field("SetOptions.<op>")?;
784                wire::write_client_settings(&mut store.conn, store.proto, &self.opts)
785                    .await
786                    .with_field("SetOptions.clientSettings")?;
787                Ok(())
788            }
789        }
790
791        struct Returner;
792        impl DaemonProgressReturner for Returner {
793            type T = ();
794            async fn result<
795                E: From<Error> + From<std::io::Error> + Send + Sync,
796                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
797            >(
798                self,
799                _store: &mut DaemonStore<C>,
800            ) -> Result<Self::T, E> {
801                Ok(())
802            }
803        }
804
805        DaemonProgress::new(self, Caller { opts }, Returner)
806    }
807
808    #[instrument(skip(self))]
809    fn query_pathinfo<S: AsRef<str> + Send + Sync + Debug>(
810        &mut self,
811        path: S,
812    ) -> impl Progress<T = Option<PathInfo>, Error = Self::Error> {
813        struct Caller<S: AsRef<str> + Send + Sync + Debug> {
814            path: S,
815        }
816        impl<S: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<S> {
817            async fn call<
818                E: From<Error> + From<std::io::Error> + Send + Sync,
819                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
820            >(
821                self,
822                store: &mut DaemonStore<C>,
823            ) -> Result<(), E> {
824                wire::write_op(&mut store.conn, wire::Op::QueryPathInfo)
825                    .await
826                    .with_field("QueryPathInfo.<op>")?;
827                wire::write_string(&mut store.conn, &self.path)
828                    .await
829                    .with_field("QueryPathInfo.path")?;
830                Ok(())
831            }
832        }
833
834        struct Returner;
835        impl DaemonProgressReturner for Returner {
836            type T = Option<PathInfo>;
837            async fn result<
838                E: From<Error> + From<std::io::Error> + Send + Sync,
839                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
840            >(
841                self,
842                store: &mut DaemonStore<C>,
843            ) -> Result<Self::T, E> {
844                if wire::read_bool(&mut store.conn).await? {
845                    Ok(Some(
846                        wire::read_pathinfo(&mut store.conn, store.proto).await?,
847                    ))
848                } else {
849                    Ok(None)
850                }
851            }
852        }
853
854        DaemonProgress::new(self, Caller { path }, Returner)
855    }
856
857    #[instrument(skip(self))]
858    fn query_valid_paths<Paths>(
859        &mut self,
860        paths: Paths,
861        use_substituters: bool,
862    ) -> impl Progress<T = Vec<String>, Error = Self::Error>
863    where
864        Paths: IntoIterator + Send + Debug,
865        Paths::IntoIter: ExactSizeIterator + Send,
866        Paths::Item: AsRef<str> + Send + Sync,
867    {
868        struct Caller<Paths>
869        where
870            Paths: IntoIterator + Send + Debug,
871            Paths::IntoIter: ExactSizeIterator + Send,
872            Paths::Item: AsRef<str> + Send + Sync,
873        {
874            paths: Paths,
875            use_substituters: bool,
876        }
877        impl<Paths> DaemonProgressCaller for Caller<Paths>
878        where
879            Paths: IntoIterator + Send + Debug,
880            Paths::IntoIter: ExactSizeIterator + Send,
881            Paths::Item: AsRef<str> + Send + Sync,
882        {
883            async fn call<
884                E: From<Error> + From<std::io::Error> + Send + Sync,
885                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
886            >(
887                self,
888                store: &mut DaemonStore<C>,
889            ) -> Result<(), E> {
890                match store.proto {
891                    Proto(1, 12..) => {
892                        wire::write_op(&mut store.conn, wire::Op::QueryValidPaths)
893                            .await
894                            .with_field("QueryValidPaths.<op>")?;
895                        wire::write_strings(&mut store.conn, self.paths)
896                            .await
897                            .with_field("QueryValidPaths.path")?;
898                        if store.proto >= Proto(1, 27) {
899                            wire::write_bool(&mut store.conn, self.use_substituters)
900                                .await
901                                .with_field("QueryValidPaths.use_substituters")?;
902                        }
903                        Ok(())
904                    }
905                    proto => Err(Error::Invalid(format!(
906                        "QueryValidPaths is not implemented for Protocol {}",
907                        proto
908                    ))
909                    .into()),
910                }
911            }
912        }
913
914        struct Returner;
915        impl DaemonProgressReturner for Returner {
916            type T = Vec<String>;
917            async fn result<
918                E: From<Error> + From<std::io::Error> + Send + Sync,
919                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
920            >(
921                self,
922                store: &mut DaemonStore<C>,
923            ) -> Result<Self::T, E> {
924                Ok(wire::read_strings(&mut store.conn)
925                    .collect::<Result<Vec<_>>>()
926                    .await?)
927            }
928        }
929
930        DaemonProgress::new(
931            self,
932            Caller {
933                paths,
934                use_substituters,
935            },
936            Returner,
937        )
938    }
939
940    #[instrument(skip(self))]
941    fn query_substitutable_paths<Paths>(
942        &mut self,
943        paths: Paths,
944    ) -> impl Progress<T = Vec<String>, Error = Self::Error>
945    where
946        Paths: IntoIterator + Send + Debug,
947        Paths::IntoIter: ExactSizeIterator + Send,
948        Paths::Item: AsRef<str> + Send + Sync,
949    {
950        struct Caller<Paths>
951        where
952            Paths: IntoIterator + Send + Debug,
953            Paths::IntoIter: ExactSizeIterator + Send,
954            Paths::Item: AsRef<str> + Send + Sync,
955        {
956            paths: Paths,
957        }
958        impl<Paths> DaemonProgressCaller for Caller<Paths>
959        where
960            Paths: IntoIterator + Send + Debug,
961            Paths::IntoIter: ExactSizeIterator + Send,
962            Paths::Item: AsRef<str> + Send + Sync,
963        {
964            async fn call<
965                E: From<Error> + From<std::io::Error> + Send + Sync,
966                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
967            >(
968                self,
969                store: &mut DaemonStore<C>,
970            ) -> Result<(), E> {
971                wire::write_op(&mut store.conn, wire::Op::QuerySubstitutablePaths)
972                    .await
973                    .with_field("QuerySubstitutablePaths.<op>")?;
974                wire::write_strings(&mut store.conn, self.paths)
975                    .await
976                    .with_field("QuerySubstitutablePaths.path")?;
977                Ok(())
978            }
979        }
980
981        struct Returner;
982        impl DaemonProgressReturner for Returner {
983            type T = Vec<String>;
984            async fn result<
985                E: From<Error> + From<std::io::Error> + Send + Sync,
986                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
987            >(
988                self,
989                store: &mut DaemonStore<C>,
990            ) -> Result<Self::T, E> {
991                Ok(wire::read_strings(&mut store.conn)
992                    .collect::<Result<Vec<_>>>()
993                    .await?)
994            }
995        }
996
997        DaemonProgress::new(self, Caller { paths }, Returner)
998    }
999
1000    #[instrument(skip(self))]
1001    fn query_valid_derivers<S: AsRef<str> + Send + Sync + Debug>(
1002        &mut self,
1003        path: S,
1004    ) -> impl Progress<T = Vec<String>, Error = Self::Error> {
1005        struct Caller<S: AsRef<str> + Send + Sync + Debug> {
1006            path: S,
1007        }
1008        impl<S: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<S> {
1009            async fn call<
1010                E: From<Error> + From<std::io::Error> + Send + Sync,
1011                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1012            >(
1013                self,
1014                store: &mut DaemonStore<C>,
1015            ) -> Result<(), E> {
1016                wire::write_op(&mut store.conn, wire::Op::QueryValidDerivers)
1017                    .await
1018                    .with_field("QueryValidDerivers.<op>")?;
1019                wire::write_string(&mut store.conn, &self.path)
1020                    .await
1021                    .with_field("QueryValidDerivers.path")?;
1022                Ok(())
1023            }
1024        }
1025
1026        struct Returner;
1027        impl DaemonProgressReturner for Returner {
1028            type T = Vec<String>;
1029            async fn result<
1030                E: From<Error> + From<std::io::Error> + Send + Sync,
1031                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1032            >(
1033                self,
1034                store: &mut DaemonStore<C>,
1035            ) -> Result<Self::T, E> {
1036                Ok(wire::read_strings(&mut store.conn)
1037                    .collect::<Result<Vec<String>>>()
1038                    .await
1039                    .with_field("QueryValidDerivers.paths")?)
1040            }
1041        }
1042
1043        DaemonProgress::new(self, Caller { path }, Returner)
1044    }
1045
1046    #[instrument(skip(self))]
1047    fn query_missing<Ps>(
1048        &mut self,
1049        paths: Ps,
1050    ) -> impl Progress<T = crate::Missing, Error = Self::Error>
1051    where
1052        Ps: IntoIterator + Send + Debug,
1053        Ps::IntoIter: ExactSizeIterator + Send,
1054        Ps::Item: AsRef<str> + Send + Sync,
1055    {
1056        struct Caller<Ps>
1057        where
1058            Ps: IntoIterator + Send + Debug,
1059            Ps::IntoIter: ExactSizeIterator + Send,
1060            Ps::Item: AsRef<str> + Send + Sync,
1061        {
1062            paths: Ps,
1063        }
1064        impl<Ps> DaemonProgressCaller for Caller<Ps>
1065        where
1066            Ps: IntoIterator + Send + Debug,
1067            Ps::IntoIter: ExactSizeIterator + Send,
1068            Ps::Item: AsRef<str> + Send + Sync,
1069        {
1070            async fn call<
1071                E: From<Error> + From<std::io::Error> + Send + Sync,
1072                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1073            >(
1074                self,
1075                store: &mut DaemonStore<C>,
1076            ) -> Result<(), E> {
1077                wire::write_op(&mut store.conn, wire::Op::QueryMissing)
1078                    .await
1079                    .with_field("QueryMissing.<op>")?;
1080                wire::write_strings(&mut store.conn, self.paths)
1081                    .await
1082                    .with_field("QueryMissing.paths")?;
1083                Ok(())
1084            }
1085        }
1086
1087        struct Returner;
1088        impl DaemonProgressReturner for Returner {
1089            type T = crate::Missing;
1090            async fn result<
1091                E: From<Error> + From<std::io::Error> + Send + Sync,
1092                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1093            >(
1094                self,
1095                store: &mut DaemonStore<C>,
1096            ) -> Result<Self::T, E> {
1097                let will_build = wire::read_strings(&mut store.conn)
1098                    .collect::<Result<Vec<String>>>()
1099                    .await
1100                    .with_field("QueryMissing.will_build")?;
1101                let will_substitute = wire::read_strings(&mut store.conn)
1102                    .collect::<Result<Vec<String>>>()
1103                    .await
1104                    .with_field("QueryMissing.will_substitute")?;
1105                let unknown = wire::read_strings(&mut store.conn)
1106                    .collect::<Result<Vec<String>>>()
1107                    .await
1108                    .with_field("QueryMissing.unknown")?;
1109                let download_size = wire::read_u64(&mut store.conn)
1110                    .await
1111                    .with_field("QueryMissing.download_size")?;
1112                let nar_size = wire::read_u64(&mut store.conn)
1113                    .await
1114                    .with_field("QueryMissing.nar_size")?;
1115                Ok(crate::Missing {
1116                    will_build,
1117                    will_substitute,
1118                    unknown,
1119                    download_size,
1120                    nar_size,
1121                })
1122            }
1123        }
1124
1125        DaemonProgress::new(self, Caller { paths }, Returner)
1126    }
1127
1128    #[instrument(skip(self))]
1129    fn query_derivation_output_map<P: AsRef<str> + Send + Sync + Debug>(
1130        &mut self,
1131        path: P,
1132    ) -> impl Progress<T = HashMap<String, String>, Error = Self::Error> {
1133        struct Caller<P: AsRef<str> + Send + Sync + Debug> {
1134            path: P,
1135        }
1136        impl<P: AsRef<str> + Send + Sync + Debug> DaemonProgressCaller for Caller<P> {
1137            async fn call<
1138                E: From<Error> + From<std::io::Error> + Send + Sync,
1139                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1140            >(
1141                self,
1142                store: &mut DaemonStore<C>,
1143            ) -> Result<(), E> {
1144                wire::write_op(&mut store.conn, wire::Op::QueryDerivationOutputMap)
1145                    .await
1146                    .with_field("QueryDerivationOutputMap.<op>")?;
1147                wire::write_string(&mut store.conn, self.path)
1148                    .await
1149                    .with_field("QueryDerivationOutputMap.paths")?;
1150                Ok(())
1151            }
1152        }
1153
1154        struct Returner;
1155        impl DaemonProgressReturner for Returner {
1156            type T = HashMap<String, String>;
1157            async fn result<
1158                E: From<Error> + From<std::io::Error> + Send + Sync,
1159                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1160            >(
1161                self,
1162                store: &mut DaemonStore<C>,
1163            ) -> Result<Self::T, E> {
1164                let mut outputs = HashMap::new();
1165                let count = wire::read_u64(&mut store.conn)
1166                    .await
1167                    .with_field("QueryDerivationOutputMap.outputs[].<count>")?;
1168                for _ in 0..count {
1169                    let name = wire::read_string(&mut store.conn)
1170                        .await
1171                        .with_field("QueryDerivationOutputMap.outputs[].name")?;
1172                    let path = wire::read_string(&mut store.conn)
1173                        .await
1174                        .with_field("QueryDerivationOutputMap.outputs[].path")?;
1175                    outputs.insert(name, path);
1176                }
1177                Ok(outputs)
1178            }
1179        }
1180
1181        DaemonProgress::new(self, Caller { path }, Returner)
1182    }
1183
1184    #[instrument(skip(self))]
1185    fn build_paths_with_results<Ps>(
1186        &mut self,
1187        paths: Ps,
1188        mode: BuildMode,
1189    ) -> impl Progress<T = HashMap<String, crate::BuildResult>, Error = Self::Error>
1190    where
1191        Ps: IntoIterator + Send + Debug,
1192        Ps::IntoIter: ExactSizeIterator + Send,
1193        Ps::Item: AsRef<str> + Send + Sync,
1194    {
1195        struct Caller<Ps>
1196        where
1197            Ps: IntoIterator + Send + Debug,
1198            Ps::IntoIter: ExactSizeIterator + Send,
1199            Ps::Item: AsRef<str> + Send + Sync,
1200        {
1201            paths: Ps,
1202            mode: BuildMode,
1203        }
1204        impl<Ps> DaemonProgressCaller for Caller<Ps>
1205        where
1206            Ps: IntoIterator + Send + Debug,
1207            Ps::IntoIter: ExactSizeIterator + Send,
1208            Ps::Item: AsRef<str> + Send + Sync,
1209        {
1210            async fn call<
1211                E: From<Error> + From<std::io::Error> + Send + Sync,
1212                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1213            >(
1214                self,
1215                store: &mut DaemonStore<C>,
1216            ) -> Result<(), E> {
1217                wire::write_op(&mut store.conn, wire::Op::BuildPathsWithResults)
1218                    .await
1219                    .with_field("BuildPathsWithResults.<op>")?;
1220                wire::write_strings(&mut store.conn, self.paths)
1221                    .await
1222                    .with_field("BuildPathsWithResults.paths")?;
1223                wire::write_build_mode(&mut store.conn, self.mode)
1224                    .await
1225                    .with_field("BuildPathsWithResults.build_mode")?;
1226                Ok(())
1227            }
1228        }
1229
1230        struct Returner;
1231        impl DaemonProgressReturner for Returner {
1232            type T = HashMap<String, crate::BuildResult>;
1233            async fn result<
1234                E: From<Error> + From<std::io::Error> + Send + Sync,
1235                C: AsyncReadExt + AsyncWriteExt + Unpin + Send,
1236            >(
1237                self,
1238                store: &mut DaemonStore<C>,
1239            ) -> Result<Self::T, E> {
1240                let count = wire::read_u64(&mut store.conn)
1241                    .await
1242                    .with_field("BuildPathsWithResults.results.<count>")?;
1243                let mut results = HashMap::with_capacity(count as usize);
1244                for _ in 0..count {
1245                    let path = wire::read_string(&mut store.conn)
1246                        .await
1247                        .with_field("BuildPathsWithResults.results[].path")?;
1248                    let result = wire::read_build_result(&mut store.conn, store.proto)
1249                        .await
1250                        .with_field("BuildPathsWithResults.results[].result")?;
1251                    results.insert(path, result);
1252                }
1253                Ok(results)
1254            }
1255        }
1256        DaemonProgress::new(self, Caller { paths, mode }, Returner)
1257    }
1258}
1259
1260#[derive(Debug)]
1261pub struct DaemonProtocolAdapterBuilder<'s, S: Store> {
1262    pub store: &'s mut S,
1263    pub nix_version: String,
1264    pub remote_trust: Option<bool>,
1265}
1266
1267impl<'s, S: Store> DaemonProtocolAdapterBuilder<'s, S> {
1268    fn new(store: &'s mut S) -> Self {
1269        Self {
1270            store,
1271            nix_version: concat!("gorgon/nix-daemon ", env!("CARGO_PKG_VERSION")).to_string(),
1272            remote_trust: None,
1273        }
1274    }
1275
1276    /// Initializes a [`DaemonProtocolAdapter`] by adopting a connection.
1277    ///
1278    /// It's up to the caller that the connection is in a state to begin a nix handshake, eg.
1279    /// it behaves like a fresh connection to the daemon socket - if this is a connection through
1280    /// a proxy, any proxy handshakes should already have taken place, etc.
1281    pub async fn adopt<
1282        R: AsyncReadExt + Unpin + Send + Debug,
1283        W: AsyncWriteExt + Unpin + Send + Debug,
1284    >(
1285        self,
1286        r: R,
1287        w: W,
1288    ) -> Result<DaemonProtocolAdapter<'s, S, R, W>> {
1289        DaemonProtocolAdapter::handshake(r, w, self.store, self.nix_version, self.remote_trust)
1290            .await
1291    }
1292}
1293
1294/// Handles an incoming `nix-daemon` protocol connection, and forwards calls to a
1295/// [`crate::Store`].
1296///
1297/// ```no_run
1298/// use tokio::net::UnixListener;
1299/// use nix_daemon::nix::{DaemonStore, DaemonProtocolAdapter};
1300///
1301/// # async {
1302/// // Accept a connection.
1303/// let listener = UnixListener::bind("/tmp/nix-proxy.sock")?;
1304/// let (conn, _addr) = listener.accept().await?;
1305///
1306/// // Connect to the real store.
1307/// let mut store = DaemonStore::builder()
1308///     .connect_unix("/nix/var/nix/daemon-socket/socket")
1309///     .await?;
1310///
1311/// // Proxy the connection to it.
1312/// let (cr, cw) = conn.into_split();
1313/// let mut adapter = DaemonProtocolAdapter::builder(&mut store)
1314///     .adopt(cr, cw)
1315///     .await?;
1316/// Ok::<(),nix_daemon::Error>(adapter.run().await?)
1317/// # };
1318/// ```
1319///
1320/// See [nix-supervisor](https://codeberg.org/gorgon/gorgon/src/branch/main/nix-supervisor) for
1321/// a more advanced example of how to use this (with a custom Store implementation).
1322pub struct DaemonProtocolAdapter<'s, S: Store, R, W>
1323where
1324    R: AsyncReadExt + Unpin + Send + Debug,
1325    W: AsyncWriteExt + Unpin + Send + Debug,
1326{
1327    r: R,
1328    w: W,
1329    store: &'s mut S,
1330    /// Negotiated protocol version.
1331    pub proto: Proto,
1332}
1333
1334impl<'s, S: Store>
1335    DaemonProtocolAdapter<'s, S, tokio::net::unix::OwnedReadHalf, tokio::net::unix::OwnedWriteHalf>
1336{
1337    pub fn builder(store: &'s mut S) -> DaemonProtocolAdapterBuilder<'s, S> {
1338        DaemonProtocolAdapterBuilder::new(store)
1339    }
1340}
1341
1342impl<'s, S: Store, R, W> DaemonProtocolAdapter<'s, S, R, W>
1343where
1344    R: AsyncReadExt + Unpin + Send + Debug,
1345    W: AsyncWriteExt + Unpin + Send + Debug,
1346{
1347    #[instrument(skip(r, w, store))]
1348    async fn handshake(
1349        mut r: R,
1350        mut w: W,
1351        store: &'s mut S,
1352        nix_version: String,
1353        remote_trust: Option<bool>,
1354    ) -> Result<Self> {
1355        // Exchange magic numbers.
1356        match wire::read_u64(&mut r).await {
1357            Ok(magic1 @ wire::WORKER_MAGIC_1) => Ok(magic1),
1358            Ok(v) => Err(Error::Invalid(format!("{:#x}", v))),
1359            Err(err) => Err(err.into()),
1360        }
1361        .with_field("magic1")?;
1362        wire::write_u64(&mut w, wire::WORKER_MAGIC_2)
1363            .await
1364            .with_field("magic2")?;
1365
1366        // Tell the client our latest supported protocol version, then they pick that or lower.
1367        wire::write_proto(&mut w, MAX_PROTO)
1368            .await
1369            .with_field("daemon_proto")?;
1370        let proto = wire::read_proto(&mut r)
1371            .await
1372            .and_then(|proto| {
1373                if proto.0 != 1 || proto < MIN_PROTO {
1374                    return Err(Error::Invalid(format!("{}", proto)));
1375                }
1376                Ok(proto)
1377            })
1378            .with_field("client_proto")?;
1379
1380        // Discard some obsolete fields.
1381        if proto >= Proto(1, 14) {
1382            wire::read_u64(&mut r)
1383                .await
1384                .with_field("__obsolete_cpu_affinity")?;
1385        }
1386        if proto >= Proto(1, 11) {
1387            wire::read_bool(&mut r)
1388                .await
1389                .with_field("__obsolete_reserve_space")?;
1390        }
1391
1392        // And use values from the builder for these.
1393        if proto >= Proto(1, 33) {
1394            wire::write_string(&mut w, &nix_version)
1395                .await
1396                .with_field("nix_version")?;
1397        }
1398        if proto >= Proto(1, 35) {
1399            wire::write_u64(
1400                &mut w,
1401                match remote_trust {
1402                    None => 0,
1403                    Some(true) => 1,
1404                    Some(false) => 2,
1405                },
1406            )
1407            .await
1408            .with_field("remote_trust")?;
1409        }
1410
1411        // Stderr is always empty.
1412        wire::write_stderr(&mut w, None)
1413            .await
1414            .with_field("stderr")?;
1415        Ok(Self { r, w, store, proto })
1416    }
1417
1418    /// Runs the connection until the client disconnects. TODO: Cancellation.
1419    pub async fn run(&mut self) -> Result<(), S::Error> {
1420        loop {
1421            match wire::read_op(&mut self.r).await {
1422                Ok(wire::Op::IsValidPath) => {
1423                    let path = wire::read_string(&mut self.r)
1424                        .await
1425                        .with_field("IsValidPath.path")?;
1426
1427                    let is_valid =
1428                        forward_stderr(&mut self.w, self.store.is_valid_path(path)).await?;
1429                    wire::write_bool(&mut self.w, is_valid)
1430                        .await
1431                        .with_field("IsValidPath.is_valid")?;
1432                }
1433                Ok(wire::Op::HasSubstitutes) => {
1434                    let path = wire::read_string(&mut self.r)
1435                        .await
1436                        .with_field("HasSubstitutes.path")?;
1437                    let has_substitutes =
1438                        forward_stderr(&mut self.w, self.store.has_substitutes(path)).await?;
1439                    wire::write_bool(&mut self.w, has_substitutes)
1440                        .await
1441                        .with_field("HasSubstitutes.has_substitutes")?;
1442                }
1443                Ok(wire::Op::AddToStore) => match self.proto {
1444                    Proto(1, 25..) => {
1445                        let name = wire::read_string(&mut self.r)
1446                            .await
1447                            .with_field("AddToStore.name")?;
1448                        let cam_str = wire::read_string(&mut self.r)
1449                            .await
1450                            .with_field("AddToStore.camStr")?;
1451                        let refs = wire::read_strings(&mut self.r)
1452                            .collect::<Result<Vec<_>>>()
1453                            .await
1454                            .with_field("AddToStore.refs")?;
1455                        let repair = wire::read_bool(&mut self.r)
1456                            .await
1457                            .with_field("AddToStore.repair")?;
1458                        let source = wire::FramedReader::new(&mut self.r);
1459
1460                        let (name, pi) = forward_stderr(
1461                            &mut self.w,
1462                            self.store.add_to_store(name, cam_str, refs, repair, source),
1463                        )
1464                        .await?;
1465                        wire::write_string(&mut self.w, name)
1466                            .await
1467                            .with_field("AddToStore.name")?;
1468                        wire::write_pathinfo(&mut self.w, self.proto, &pi)
1469                            .await
1470                            .with_field("AddToStore.pi")?;
1471                    }
1472                    _ => {
1473                        return Err(Error::Invalid(format!(
1474                            "AddToStore is not implemented for Protocol {}",
1475                            self.proto
1476                        ))
1477                        .into())
1478                    }
1479                },
1480                Ok(wire::Op::BuildPaths) => {
1481                    let paths = wire::read_strings(&mut self.r)
1482                        .collect::<Result<Vec<_>>>()
1483                        .await
1484                        .with_field("BuildPaths.paths")?;
1485                    let mode = if self.proto >= Proto(1, 15) {
1486                        wire::read_build_mode(&mut self.r)
1487                            .await
1488                            .with_field("BuildPaths.build_mode")?
1489                    } else {
1490                        BuildMode::Normal
1491                    };
1492
1493                    forward_stderr(&mut self.w, self.store.build_paths(paths, mode)).await?;
1494                    wire::write_u64(&mut self.w, 1)
1495                        .await
1496                        .with_field("BuildPaths.__unused__")?;
1497                }
1498                Ok(wire::Op::EnsurePath) => {
1499                    let path = wire::read_string(&mut self.r)
1500                        .await
1501                        .with_field("EnsurePath.path")?;
1502                    forward_stderr(&mut self.w, self.store.ensure_path(path)).await?;
1503                    wire::write_u64(&mut self.w, 1)
1504                        .await
1505                        .with_field("EnsurePath.__unused__")?;
1506                }
1507                Ok(wire::Op::AddTempRoot) => {
1508                    let path = wire::read_string(&mut self.r)
1509                        .await
1510                        .with_field("AddTempRoot.path")?;
1511
1512                    forward_stderr(&mut self.w, self.store.add_temp_root(path)).await?;
1513                    wire::write_u64(&mut self.w, 1)
1514                        .await
1515                        .with_field("AddTempRoot.__unused__")?;
1516                }
1517                Ok(wire::Op::AddIndirectRoot) => {
1518                    let path = wire::read_string(&mut self.r)
1519                        .await
1520                        .with_field("AddIndirectRoot.path")?;
1521
1522                    forward_stderr(&mut self.w, self.store.add_indirect_root(path)).await?;
1523                    wire::write_u64(&mut self.w, 1)
1524                        .await
1525                        .with_field("AddIndirectRoot.__unused__")?;
1526                }
1527                Ok(wire::Op::FindRoots) => {
1528                    let roots = forward_stderr(&mut self.w, self.store.find_roots()).await?;
1529
1530                    wire::write_u64(&mut self.w, roots.len() as u64)
1531                        .await
1532                        .with_field("FindRoots.roots[].<count>")?;
1533                    for (link, target) in roots {
1534                        wire::write_string(&mut self.w, link)
1535                            .await
1536                            .with_field("FindRoots.roots[].link")?;
1537                        wire::write_string(&mut self.w, target)
1538                            .await
1539                            .with_field("FindRoots.roots[].target")?;
1540                    }
1541                }
1542                Ok(wire::Op::SetOptions) => {
1543                    let ops = wire::read_client_settings(&mut self.r, self.proto)
1544                        .await
1545                        .with_field("SetOptions.clientSettings")?;
1546                    forward_stderr(&mut self.w, self.store.set_options(ops)).await?;
1547                }
1548                Ok(wire::Op::QueryPathInfo) => {
1549                    let path = wire::read_string(&mut self.r)
1550                        .await
1551                        .with_field("QueryPathInfo.path")?;
1552
1553                    let pi = forward_stderr(&mut self.w, self.store.query_pathinfo(path)).await?;
1554
1555                    wire::write_bool(&mut self.w, pi.is_some())
1556                        .await
1557                        .with_field("QueryPathInfo.is_valid")?;
1558                    if let Some(pi) = pi {
1559                        wire::write_pathinfo(&mut self.w, self.proto, &pi)
1560                            .await
1561                            .with_field("QueryPathInfo.path_info")?;
1562                    }
1563                }
1564                Ok(wire::Op::QueryValidPaths) => match self.proto {
1565                    Proto(1, 12..) => {
1566                        let paths = wire::read_strings(&mut self.r)
1567                            .collect::<Result<Vec<_>>>()
1568                            .await
1569                            .with_field("QueryValidPaths.path")?;
1570                        let use_substituters = if self.proto >= Proto(1, 27) {
1571                            wire::read_bool(&mut self.r)
1572                                .await
1573                                .with_field("QueryValidPaths.use_substituters")?
1574                        } else {
1575                            true
1576                        };
1577
1578                        let valid_paths = forward_stderr(
1579                            &mut self.w,
1580                            self.store.query_valid_paths(paths, use_substituters),
1581                        )
1582                        .await?;
1583
1584                        wire::write_strings(&mut self.w, valid_paths)
1585                            .await
1586                            .with_field("QueryValidPaths.valid_path")?;
1587                    }
1588                    _ => {
1589                        return Err(Error::Invalid(format!(
1590                            "QueryValidPaths is not implemented for Protocol {}",
1591                            self.proto
1592                        ))
1593                        .into())
1594                    }
1595                },
1596                Ok(wire::Op::QuerySubstitutablePaths) => {
1597                    let paths = wire::read_strings(&mut self.r)
1598                        .collect::<Result<Vec<_>>>()
1599                        .await
1600                        .with_field("QuerySubstitutablePaths.paths")?;
1601                    let sub_paths =
1602                        forward_stderr(&mut self.w, self.store.query_substitutable_paths(paths))
1603                            .await?;
1604                    wire::write_strings(&mut self.w, sub_paths)
1605                        .await
1606                        .with_field("QuerySubstitutablePaths.sub_paths")?;
1607                }
1608                Ok(wire::Op::QueryMissing) => {
1609                    let paths = wire::read_strings(&mut self.r)
1610                        .collect::<Result<Vec<_>>>()
1611                        .await
1612                        .with_field("QueryMissing.paths")?;
1613
1614                    let crate::Missing {
1615                        will_build,
1616                        will_substitute,
1617                        unknown,
1618                        download_size,
1619                        nar_size,
1620                    } = forward_stderr(&mut self.w, self.store.query_missing(paths)).await?;
1621
1622                    wire::write_strings(&mut self.w, will_build)
1623                        .await
1624                        .with_field("QueryMissing.will_build")?;
1625                    wire::write_strings(&mut self.w, will_substitute)
1626                        .await
1627                        .with_field("QueryMissing.will_substitute")?;
1628                    wire::write_strings(&mut self.w, unknown)
1629                        .await
1630                        .with_field("QueryMissing.unknown")?;
1631                    wire::write_u64(&mut self.w, download_size)
1632                        .await
1633                        .with_field("QueryMissing.download_size")?;
1634                    wire::write_u64(&mut self.w, nar_size)
1635                        .await
1636                        .with_field("QueryMissing.nar_size")?;
1637                }
1638                Ok(wire::Op::QueryValidDerivers) => {
1639                    let path = wire::read_string(&mut self.r)
1640                        .await
1641                        .with_field("QueryValidDerivers.path")?;
1642
1643                    let derivers =
1644                        forward_stderr(&mut self.w, self.store.query_valid_derivers(path)).await?;
1645                    wire::write_strings(&mut self.w, derivers)
1646                        .await
1647                        .with_field("QueryValidDerivers.paths")?
1648                }
1649                Ok(wire::Op::QueryDerivationOutputMap) => {
1650                    let path = wire::read_string(&mut self.r)
1651                        .await
1652                        .with_field("QueryDerivationOutputMap.paths")?;
1653
1654                    let outputs =
1655                        forward_stderr(&mut self.w, self.store.query_derivation_output_map(path))
1656                            .await?;
1657                    wire::write_u64(&mut self.w, outputs.len() as u64)
1658                        .await
1659                        .with_field("QueryDerivationOutputMap.outputs[].<count>")?;
1660                    for (name, path) in outputs {
1661                        wire::write_string(&mut self.w, name)
1662                            .await
1663                            .with_field("QueryDerivationOutputMap.outputs[].name")?;
1664                        wire::write_string(&mut self.w, path)
1665                            .await
1666                            .with_field("QueryDerivationOutputMap.outputs[].path")?;
1667                    }
1668                }
1669                Ok(wire::Op::BuildPathsWithResults) => {
1670                    let paths = wire::read_strings(&mut self.r)
1671                        .collect::<Result<Vec<_>>>()
1672                        .await
1673                        .with_field("BuildPathsWithResults.paths")?;
1674                    let mode = wire::read_build_mode(&mut self.r)
1675                        .await
1676                        .with_field("BuildPathsWithResults.build_mode")?;
1677
1678                    let results = forward_stderr(
1679                        &mut self.w,
1680                        self.store.build_paths_with_results(paths, mode),
1681                    )
1682                    .await?;
1683
1684                    wire::write_u64(&mut self.w, results.len() as u64)
1685                        .await
1686                        .with_field("BuildPathsWithResults.results.<count>")?;
1687                    for (path, result) in results {
1688                        wire::write_string(&mut self.w, path)
1689                            .await
1690                            .with_field("BuildPathsWithResults.results[].path")?;
1691                        wire::write_build_result(&mut self.w, &result, self.proto)
1692                            .await
1693                            .with_field("BuildPathsWithResults.results[].result")?;
1694                    }
1695                }
1696                Ok(v) => todo!("{:#?}", v),
1697
1698                Err(Error::IO(err)) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
1699                    info!("Client disconnected");
1700                    return Ok(());
1701                }
1702                Err(err) => return Err(err.into()),
1703            }
1704        }
1705    }
1706}
1707
1708async fn forward_stderr<W: AsyncWriteExt + Unpin, P: Progress>(
1709    w: &mut W,
1710    mut prog: P,
1711) -> Result<P::T, P::Error> {
1712    while let Some(stderr) = prog.next().await? {
1713        wire::write_stderr(w, Some(stderr)).await?;
1714    }
1715    wire::write_stderr(w, None).await?;
1716    prog.result().await
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721    use super::*;
1722
1723    // Sanity check for version comparisons.
1724    #[test]
1725    fn test_version_ord() {
1726        assert!(Proto(0, 1) > Proto(0, 0));
1727        assert!(Proto(1, 0) > Proto(0, 0));
1728        assert!(Proto(1, 0) > Proto(0, 1));
1729        assert!(Proto(1, 1) > Proto(1, 0));
1730    }
1731}