nix_remote/
worker_op.rs

1//! Worker ops from the Nix protocol.
2
3use serde::{Deserialize, Serialize};
4use std::io::Read;
5use std::io::Write;
6use std::ops::{Deref, DerefMut};
7use tagged_serde::TaggedSerde;
8
9use crate::framed_data;
10use crate::nar::Nar;
11use crate::{
12    serialize::{NixDeserializer, NixSerializer},
13    NarHash, NixString, Result, StorePath, StorePathSet, StringSet, ValidPathInfoWithPath,
14};
15use crate::{DerivedPath, Path, PathSet, Realisation, RealisationSet};
16
17/// A zero-sized marker type. Its job is to mark the expected response
18/// type for each worker op.
19#[derive(Debug, Serialize, Deserialize)]
20pub struct Resp<T> {
21    #[serde(skip)]
22    marker: std::marker::PhantomData<T>,
23}
24
25impl<T> Resp<T> {
26    pub fn ty(&self, v: T) -> T {
27        v
28    }
29}
30
31#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
32pub struct Plain<T>(pub T);
33
34impl<T> Deref for Plain<T> {
35    type Target = T;
36
37    fn deref(&self) -> &Self::Target {
38        &self.0
39    }
40}
41
42#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
43pub struct WithFramedSource<T>(pub T);
44
45impl<T> Deref for WithFramedSource<T> {
46    type Target = T;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl<T> DerefMut for WithFramedSource<T> {
54    fn deref_mut(&mut self) -> &mut Self::Target {
55        &mut self.0
56    }
57}
58
59pub trait Stream {
60    fn stream(&self, read: &mut impl Read, write: &mut impl Write) -> anyhow::Result<()>;
61}
62
63impl<T> Stream for WithFramedSource<T> {
64    fn stream(&self, read: &mut impl Read, write: &mut impl Write) -> anyhow::Result<()> {
65        framed_data::stream(read, write)
66    }
67}
68
69impl<T> Stream for Plain<T> {
70    fn stream(&self, _read: &mut impl Read, _write: &mut impl Write) -> anyhow::Result<()> {
71        Ok(())
72    }
73}
74
75/// The worker ops of the nix protocol.
76///
77/// The second argument in each variant is a tag denoting the expected return value.
78///
79/// On the wire, they are represented as the opcode followed by the body.
80#[derive(Debug, TaggedSerde)]
81pub enum WorkerOp {
82    #[tagged_serde = 1]
83    IsValidPath(Plain<StorePath>, Resp<bool>),
84    #[tagged_serde = 6]
85    QueryReferrers(Plain<StorePath>, Resp<StorePathSet>),
86    #[tagged_serde = 7]
87    AddToStore(WithFramedSource<AddToStore>, Resp<ValidPathInfoWithPath>),
88    #[tagged_serde = 9]
89    BuildPaths(Plain<BuildPaths>, Resp<u64>),
90    #[tagged_serde = 10]
91    EnsurePath(Plain<StorePath>, Resp<u64>),
92    #[tagged_serde = 11]
93    AddTempRoot(Plain<StorePath>, Resp<u64>),
94    #[tagged_serde = 14]
95    FindRoots(Plain<()>, Resp<FindRootsResponse>),
96    #[tagged_serde = 19]
97    SetOptions(Plain<SetOptions>, Resp<()>),
98    #[tagged_serde = 20]
99    CollectGarbage(Plain<CollectGarbage>, Resp<CollectGarbageResponse>),
100    #[tagged_serde = 23]
101    QueryAllValidPaths(Plain<()>, Resp<StorePathSet>),
102    #[tagged_serde = 26]
103    QueryPathInfo(Plain<StorePath>, Resp<QueryPathInfoResponse>),
104    #[tagged_serde = 29]
105    QueryPathFromHashPart(Plain<NixString>, Resp<OptionalStorePath>),
106    #[tagged_serde = 31]
107    QueryValidPaths(Plain<QueryValidPaths>, Resp<StorePathSet>),
108    #[tagged_serde = 32]
109    QuerySubstitutablePaths(Plain<StorePathSet>, Resp<StorePathSet>),
110    #[tagged_serde = 33]
111    QueryValidDerivers(Plain<StorePath>, Resp<StorePathSet>),
112    #[tagged_serde = 34]
113    OptimiseStore(Plain<()>, Resp<u64>),
114    #[tagged_serde = 35]
115    VerifyStore(Plain<VerifyStore>, Resp<bool>),
116    #[tagged_serde = 36]
117    BuildDerivation(Plain<BuildDerivation>, Resp<BuildResult>),
118    #[tagged_serde = 37]
119    AddSignatures(Plain<AddSignatures>, Resp<u64>),
120    #[tagged_serde = 38]
121    NarFromPath(Plain<StorePath>, Resp<Nar>),
122    #[tagged_serde = 39]
123    AddToStoreNar(WithFramedSource<AddToStoreNar>, Resp<()>),
124    #[tagged_serde = 40]
125    QueryMissing(Plain<QueryMissing>, Resp<QueryMissingResponse>),
126    #[tagged_serde = 41]
127    QueryDerivationOutputMap(Plain<StorePath>, Resp<DerivationOutputMap>),
128    #[tagged_serde = 42]
129    RegisterDrvOutput(Plain<Realisation>, Resp<()>),
130    #[tagged_serde = 43]
131    QueryRealisation(Plain<NixString>, Resp<RealisationSet>),
132    #[tagged_serde = 44]
133    AddMultipleToStore(WithFramedSource<AddMultipleToStore>, Resp<()>),
134    #[tagged_serde = 45]
135    AddBuildLog(WithFramedSource<AddBuildLog>, Resp<u64>),
136    #[tagged_serde = 46]
137    BuildPathsWithResults(Plain<BuildPaths>, Resp<Vec<(DerivedPath, BuildResult)>>),
138}
139
140macro_rules! for_each_op {
141    ($macro_name:ident !) => {
142        $macro_name!(
143            IsValidPath,
144            QueryReferrers,
145            AddToStore,
146            BuildPaths,
147            EnsurePath,
148            AddTempRoot,
149            FindRoots,
150            SetOptions,
151            CollectGarbage,
152            QueryAllValidPaths,
153            QueryPathInfo,
154            QueryPathFromHashPart,
155            QueryValidPaths,
156            QuerySubstitutablePaths,
157            QueryValidDerivers,
158            OptimiseStore,
159            VerifyStore,
160            BuildDerivation,
161            AddSignatures,
162            NarFromPath,
163            AddToStoreNar,
164            QueryMissing,
165            QueryDerivationOutputMap,
166            RegisterDrvOutput,
167            QueryRealisation,
168            AddMultipleToStore,
169            AddBuildLog,
170            BuildPathsWithResults
171        )
172    };
173}
174
175impl Stream for WorkerOp {
176    fn stream(&self, read: &mut impl Read, write: &mut impl Write) -> anyhow::Result<()> {
177        eprintln!("streaming worker op");
178        macro_rules! stream {
179            ($($name:ident),*) => {
180                match self {
181                    $(WorkerOp::$name(op, _resp) => {
182                        op.stream(read, write)?;
183                    },)*
184                }
185            };
186        }
187
188        for_each_op!(stream!);
189        Ok(())
190    }
191}
192
193impl WorkerOp {
194    pub fn proxy_response(&self, mut read: impl Read, mut write: impl Write) -> Result<()> {
195        let mut deser = NixDeserializer { read: &mut read };
196        let mut ser = NixSerializer { write: &mut write };
197        let mut dbg_buf = Vec::new();
198        let mut dbg_ser = NixSerializer {
199            write: &mut dbg_buf,
200        };
201        macro_rules! respond {
202            ($($name:ident),*) => {
203                #[allow(unreachable_patterns)]
204                match self {
205                    // Special case for NarFromPath because the response could be large
206                    // and needs to be streamed instead of read into memory.
207                    WorkerOp::NarFromPath(_inner, _resp) => {
208                      crate::nar::stream(&mut deser.read, &mut ser.write)?;
209                    }
210                    $(WorkerOp::$name(_inner, resp) => {
211                        let reply = resp.ty(<_>::deserialize(&mut deser)?);
212                        eprintln!("read reply {reply:?}");
213
214                        reply.serialize(&mut dbg_ser)?;
215                        reply.serialize(&mut ser)?;
216                    },)*
217                }
218            };
219        }
220
221        for_each_op!(respond!);
222        Ok(())
223    }
224}
225
226type Time = u64;
227type OptionalStorePath = StorePath;
228
229#[derive(Debug, Clone, Copy, TaggedSerde, PartialEq, Eq)]
230pub enum Verbosity {
231    #[tagged_serde = 0]
232    Error,
233    #[tagged_serde = 1]
234    Warn,
235    #[tagged_serde = 2]
236    Notice,
237    #[tagged_serde = 3]
238    Info,
239    #[tagged_serde = 4]
240    Talkative,
241    #[tagged_serde = 5]
242    Chatty,
243    #[tagged_serde = 6]
244    Debug,
245    #[tagged_serde = 7]
246    Vomit,
247}
248
249#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
250pub struct SetOptions {
251    pub keep_failing: bool,
252    pub keep_going: bool,
253    pub try_fallback: bool,
254    pub verbosity: Verbosity,
255    pub max_build_jobs: u64,
256    pub max_silent_time: Time,
257    _use_build_hook: u64,
258    pub build_verbosity: Verbosity,
259    _log_type: u64,
260    _print_build_trace: u64,
261    pub build_cores: u64,
262    pub use_substitutes: bool,
263    pub options: Vec<(NixString, NixString)>,
264}
265
266#[derive(Debug, Clone, Deserialize, Serialize)]
267pub struct AddToStore {
268    pub name: StorePath,
269    pub cam_str: StorePath,
270    pub refs: StorePathSet,
271    pub repair: bool,
272}
273
274#[derive(Debug, Clone, Copy, TaggedSerde)]
275pub enum BuildMode {
276    #[tagged_serde = 0]
277    Normal,
278    #[tagged_serde = 1]
279    Repair,
280    #[tagged_serde = 2]
281    Check,
282}
283
284#[derive(Debug, Clone, Deserialize, Serialize)]
285pub struct BuildPaths {
286    pub paths: Vec<StorePath>,
287    pub build_mode: BuildMode,
288}
289
290#[derive(Debug, Clone, Deserialize, Serialize)]
291pub struct QueryMissing {
292    pub paths: Vec<StorePath>,
293}
294
295#[derive(Debug, Clone, Deserialize, Serialize)]
296pub struct QueryPathInfoResponse {
297    pub path: Option<ValidPathInfo>,
298}
299
300#[derive(Debug, Clone, Deserialize, Serialize)]
301pub struct QueryMissingResponse {
302    pub will_build: StorePathSet,
303    pub will_substitute: StorePathSet,
304    pub unknown: StorePathSet,
305    pub download_size: u64,
306    pub nar_size: u64,
307}
308
309#[derive(Debug, Clone, Copy, TaggedSerde)]
310pub enum BuildStatus {
311    #[tagged_serde = 0]
312    Built,
313    #[tagged_serde = 1]
314    Substituted,
315    #[tagged_serde = 2]
316    AlreadyValid,
317    #[tagged_serde = 3]
318    PermanentFailure,
319    #[tagged_serde = 4]
320    InputRejected,
321    #[tagged_serde = 5]
322    OutputRejected,
323    #[tagged_serde = 6]
324    TransientFailure,
325    #[tagged_serde = 7]
326    CachedFailure,
327    #[tagged_serde = 8]
328    TimedOut,
329    #[tagged_serde = 9]
330    MiscFailure,
331    #[tagged_serde = 10]
332    DependencyFailed,
333    #[tagged_serde = 11]
334    LogLimitExceeded,
335    #[tagged_serde = 12]
336    NotDeterministic,
337    #[tagged_serde = 13]
338    ResolvesToAlreadyValid,
339    #[tagged_serde = 14]
340    NoSubstituters,
341}
342
343#[derive(Debug, Clone, Deserialize, Serialize)]
344pub struct BuildResult {
345    pub status: BuildStatus,
346    pub error_msg: NixString,
347    pub times_built: u64,
348    pub is_non_deterministic: bool,
349    pub start_time: Time,
350    pub stop_time: Time,
351    pub built_outputs: DrvOutputs,
352}
353
354// TODO: first NixString is a DrvOutput; second is a Realisation
355#[derive(Debug, Clone, Deserialize, Serialize)]
356pub struct DrvOutputs(pub Vec<(NixString, Realisation)>);
357
358#[derive(Debug, Clone, Deserialize, Serialize)]
359pub struct CollectGarbage {
360    pub action: GcAction,
361    pub paths_to_delete: StorePathSet,
362    pub ignore_liveness: bool,
363    pub max_freed: u64,
364    _obsolete0: u64,
365    _obsolete1: u64,
366    _obsolete2: u64,
367}
368
369#[derive(Debug, Clone, Deserialize, Serialize)]
370pub struct DerivationOutputMap {
371    pub paths: Vec<(NixString, OptionalStorePath)>,
372}
373
374#[derive(Debug, Clone, Deserialize, Serialize)]
375pub struct CollectGarbageResponse {
376    pub paths: PathSet,
377    pub bytes_freed: u64,
378    _obsolete: u64,
379}
380
381#[derive(Debug, Clone, TaggedSerde, Default)]
382pub enum GcAction {
383    #[tagged_serde = 0]
384    ReturnLive,
385    #[tagged_serde = 1]
386    ReturnDead,
387    #[default]
388    #[tagged_serde = 2]
389    DeleteDead,
390    #[tagged_serde = 3]
391    DeleteSpecific,
392}
393
394#[derive(Debug, Clone, Deserialize, Serialize)]
395pub struct AddToStoreNar {
396    pub path: StorePath,
397    pub deriver: OptionalStorePath,
398    pub nar_hash: NixString,
399    pub references: StorePathSet,
400    pub registration_time: Time,
401    pub nar_size: u64,
402    pub ultimate: bool,
403    pub sigs: StringSet,
404    pub content_address: RenderedContentAddress,
405    pub repair: bool,
406    pub dont_check_sigs: bool,
407}
408
409#[derive(Debug, Clone, Deserialize, Serialize)]
410pub struct FindRootsResponse {
411    pub roots: Vec<(Path, StorePath)>,
412}
413
414#[derive(Debug, Clone, Deserialize, Serialize)]
415pub struct QueryValidPaths {
416    pub paths: StorePathSet,
417    pub builders_use_substitutes: bool,
418}
419
420#[derive(Debug, Clone, Deserialize, Serialize)]
421pub struct AddMultipleToStore {
422    pub repair: bool,
423    pub dont_check_sigs: bool,
424}
425
426#[derive(Clone, Debug, Serialize, Deserialize)]
427pub struct ValidPathInfo {
428    pub deriver: OptionalStorePath,
429    pub hash: NarHash,
430    pub references: StorePathSet,
431    pub registration_time: Time, // In seconds, since the epoch
432    pub nar_size: u64,
433    pub ultimate: bool,
434    pub sigs: StringSet,
435    pub content_address: RenderedContentAddress, // Can be empty
436}
437
438type RenderedContentAddress = NixString;
439
440#[derive(Clone, Debug, Serialize, Deserialize)]
441pub struct VerifyStore {
442    pub check_contents: bool,
443    pub repair: bool,
444}
445
446#[derive(Clone, Debug, Serialize, Deserialize)]
447pub struct AddSignatures {
448    pub path: StorePath,
449    pub signatures: StringSet,
450}
451
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct AddBuildLog {
454    pub path: StorePath,
455}
456
457#[derive(Clone, Debug, Serialize, Deserialize)]
458pub struct BuildDerivation {
459    pub store_path: StorePath,
460    pub derivation: Derivation,
461    pub build_mode: BuildMode,
462}
463
464#[derive(Clone, Debug, Serialize, Deserialize)]
465pub struct Derivation {
466    pub outputs: Vec<(NixString, DerivationOutput)>,
467    pub input_sources: StorePathSet,
468    pub platform: NixString,
469    pub builder: Path,
470    pub args: StringSet,
471    pub env: Vec<(NixString, NixString)>,
472}
473
474#[derive(Clone, Debug, Serialize, Deserialize)]
475pub struct DerivationOutput {
476    pub store_path: StorePath,
477    pub method_or_hash: NixString,
478    pub hash_or_impure: NixString,
479}
480
481#[cfg(test)]
482mod tests {
483    use serde_bytes::ByteBuf;
484
485    use crate::{serialize::NixSerializer, worker_op::SetOptions};
486
487    use super::*;
488
489    #[test]
490    fn test_serialize() {
491        let options = SetOptions {
492            keep_failing: true,
493            keep_going: false,
494            try_fallback: true,
495            verbosity: Verbosity::Vomit,
496            max_build_jobs: 77,
497            max_silent_time: 77,
498            _use_build_hook: 77,
499            build_verbosity: Verbosity::Error,
500            _log_type: 77,
501            _print_build_trace: 77,
502            build_cores: 77,
503            use_substitutes: false,
504            options: vec![(
505                NixString(ByteBuf::from(b"buf1".to_owned())),
506                NixString(ByteBuf::from(b"buf2".to_owned())),
507            )],
508        };
509        let mut cursor = std::io::Cursor::new(Vec::new());
510        let mut serializer = NixSerializer { write: &mut cursor };
511        options.serialize(&mut serializer).unwrap();
512
513        cursor.set_position(0);
514        let mut deserializer = NixDeserializer { read: &mut cursor };
515        assert_eq!(options, SetOptions::deserialize(&mut deserializer).unwrap());
516    }
517}