cindy-macros 0.1.1

Managing infrastructure at breakneck speed.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
use proc_macro::TokenStream;
use quote::quote;
use syn::{FnArg, Item, ItemFn, Pat, PatIdent, parse_macro_input};

/// Shorthand for deriving Debug and serde::{Deserialize, Serialize}
///
/// Prevents explicit `serde` dependency
#[proc_macro_attribute]
pub fn wire(_args: TokenStream, input: TokenStream) -> TokenStream {
    let item = parse_macro_input!(input as Item);

    quote! {
        #[derive(
            cindy::__reexports::serde::Serialize,
            cindy::__reexports::serde::Deserialize
        )]
        #[serde(crate = "cindy::__reexports::serde")]
        #item
    }
    .into()
}

/// Specifies an entrypoint.
///
/// - On `feature = "orchestrator"`
///   - Compiles to normal program.
/// - On `feature = "remote"`
///   - Runs a RPC loop instead.
///
/// The user's `main` may be declared in one of two shapes:
///
/// 1. `async fn main() -> Result<()>` — legacy, no host context.
/// 2. `async fn main(host: cindy::Host<V>) -> Result<()>` — the macro
///    reads the `CINDY_HOST_CONTEXT` env var (JSON serialized
///    `cindy::Host<V>` produced by the CLI for this particular target)
///    and hands it pre-parsed to the body. Any `Serialize +
///    DeserializeOwned` `V` works; use `cindy::Host<()>` if you only
///    need `name`/`tags`.
#[proc_macro_attribute]
pub fn main(_args: TokenStream, input: TokenStream) -> TokenStream {
    let input = parse_macro_input!(input as ItemFn);

    let (attrs, vis, sig, block) = (&input.attrs, &input.vis, &input.sig, &input.block);
    let output = &sig.output;
    let inputs = &sig.inputs;

    // Decide how to invoke `__user_main` based on whether the user
    // declared a host parameter. Exactly zero or one parameters is
    // supported; a single param is always treated as the host context
    // and its declared type is used as the deserialization target.
    let invoke_user_main = match inputs.len() {
        0 => quote! {
            cindy::__reexports::tokio::spawn(__user_main())
        },
        1 => {
            let host_type = match &inputs[0] {
                FnArg::Typed(pt) => &pt.ty,
                FnArg::Receiver(_) => {
                    panic!("`#[cindy::main]` cannot take `self`");
                }
            };
            quote! {
                {
                    let __host_json = ::std::env::var("CINDY_HOST_CONTEXT").expect(
                        "CINDY_HOST_CONTEXT not set!\n\
                         The orchestrator process is meant to be launched by `cindy` command line tool. \
                         If you're trying to run the binary directly, set CINDY_HOST_CONTEXT to a \
                         JSON-serialised `cindy::Host<V>` first."
                    );
                    let __host: #host_type =
                        cindy::__reexports::serde_json::from_str(&__host_json)
                            .expect("CINDY_HOST_CONTEXT was not valid JSON for the declared `cindy::Host<V>` type");
                    cindy::__reexports::tokio::spawn(__user_main(__host))
                }
            }
        }
        _ => panic!(
            "`#[cindy::main]` accepts at most one parameter (the host context, `cindy::Host<V>`)"
        ),
    };

    quote! {
        #(#attrs)*
        #[cindy::__reexports::tokio::main(crate = "cindy::__reexports::tokio")]
        #vis async fn main() {
            let (rpc_in, rpc_out) = cindy::common::quarantine_stdio();

            #[cfg(feature = "orchestrator")]
            if ::std::env::var_os("CINDY_DUMP_INVENTORY").is_some() {
                let entries: ::std::vec::Vec<&cindy::inventory_types::RegisteredInventory> =
                    cindy::__reexports::inventory::iter::<cindy::inventory_types::RegisteredInventory>
                        .into_iter()
                        .collect();
                let value: cindy::__reexports::serde_json::Value = match entries.as_slice() {
                    [one] => (one.dump)().await,
                    [] => {
                        ::std::eprintln!(
                            "no `#[cindy::inventory]` registered in this binary — \
                             define exactly one inventory function"
                        );
                        ::std::process::exit(2);
                    }
                    _ => {
                        ::std::eprintln!(
                            "multiple `#[cindy::inventory]` functions registered \
                             ({} found); exactly one is supported",
                            entries.len()
                        );
                        ::std::process::exit(2);
                    }
                };
                let bytes = cindy::__reexports::serde_json::to_vec(&value)
                    .expect("Failed to serialise inventory to JSON");
                {
                    use cindy::__reexports::tokio::io::AsyncWriteExt as _;
                    let mut out = rpc_out;
                    out.write_all(&bytes).await.expect("Failed to write inventory");
                    out.flush().await.expect("Failed to flush inventory");
                }
                ::std::process::exit(0);
            }

            #[cfg(feature = "orchestrator")]
            if ::std::env::var_os("CINDY_SEAL_SECRETS").is_some() {
                use cindy::__reexports::tokio::io::AsyncWriteExt as _;
                let mut out = rpc_out;
                let mut failed = false;
                for pending in cindy::__reexports::inventory::iter::<cindy::secret::PendingSecret>() {
                    let plaintext = (pending.serialize)();
                    // Refuse to bootstrap a vault on the fly. If the
                    // user didn't `cindy secret vault create <name>`
                    // before running seal, that's almost certainly a
                    // mistake — they probably meant to copy an
                    // existing team-shared key file into place. Bail
                    // with the (already actionable) error from the
                    // keychain layer instead of generating a fresh
                    // key whose ciphertext nobody else can decrypt.
                    let dek = match cindy::secret::keychain::get_dek(pending.vault) {
                        Ok(d) => d,
                        Err(e) => {
                            ::std::eprintln!(
                                "cindy secret seal: couldn't load DEK for vault `{}` \
                                 (referenced from {}:{}:{}): {e:#}",
                                pending.vault, pending.file, pending.line, pending.column,
                            );
                            failed = true;
                            continue;
                        }
                    };
                    let ciphertext = match cindy::secret::crypto::seal(&dek, &plaintext) {
                        Ok(c) => c,
                        Err(e) => {
                            ::std::eprintln!(
                                "cindy secret seal: encryption failed for {}:{}:{} ({e:#})",
                                pending.file, pending.line, pending.column,
                            );
                            failed = true;
                            continue;
                        }
                    };
                    use cindy::__reexports::base64::Engine as _;
                    let b64 = cindy::__reexports::base64::engine::general_purpose::STANDARD
                        .encode(&ciphertext);
                    let line = cindy::__reexports::serde_json::json!({
                        "file":       pending.file,
                        "line":       pending.line,
                        "column":     pending.column,
                        "vault":      pending.vault,
                        "ciphertext": b64,
                    });
                    let mut bytes = cindy::__reexports::serde_json::to_vec(&line)
                        .expect("Failed to serialise seal record");
                    bytes.push(b'\n');
                    out.write_all(&bytes).await.expect("Failed to write seal record");
                }
                out.flush().await.expect("Failed to flush seal records");
                ::std::process::exit(if failed { 2 } else { 0 });
            }

            // Remote-only builds skip the user's body entirely; the worker
            // process just runs the RPC dispatch loop.
            #[cfg(all(feature = "remote", not(feature = "orchestrator")))]
            {
                cindy::remote::rpc(rpc_in, rpc_out).await;
                ::std::process::exit(0);
            }

            // Orchestrator builds (and dual-feature LSP builds) set up the
            // dispatch channel and then execute the user's main body. Using a
            // *positive* cfg here — rather than `not(remote)` — keeps the
            // body in scope when both features are enabled at once, which is
            // how rust-analyzer typically evaluates this workspace.
            #[cfg(feature = "orchestrator")]
            {
                async fn __user_main(#inputs) #output #block

                let (tx, rx) = cindy::__reexports::tokio::sync::mpsc::unbounded_channel();
                cindy::orchestrator::ORCHESTRATOR_TX
                    .set(tx)
                    .expect("ORCHESTRATOR_TX already set");
                cindy::__reexports::tokio::spawn(cindy::orchestrator::rpc(rx, rpc_in, rpc_out));
                match #invoke_user_main.await {
                    Ok(Ok(_)) => {
                        ::std::process::exit(0);
                    }
                    Ok(Err(e)) => {
                        ::std::eprintln!("\x1b[31m{:?}\x1b[0m", e);
                        ::std::process::exit(1);
                    }
                    Err(_) => {
                        ::std::process::exit(1);
                    }
                };
            }
        }
    }
    .into()
}

/// Registers a user-defined inventory function.
///
/// Exactly one `#[cindy::inventory]` should exist per binary. The
/// function may be sync or async, and may return either
/// `cindy::Inventory<V>` or `cindy::Result<cindy::Inventory<V>>`. The
/// `#[cindy::main]`-generated `main` invokes it whenever the orchestrator
/// binary is launched with `CINDY_DUMP_INVENTORY=1`, serialises the
/// result to JSON, writes it to the original FD 1, and exits — the CLI
/// then drives preflight (arch discovery, cross-compilation) and
/// per-host orchestrator launches off that single JSON dump.
///
/// Async-vs-sync auto-wrapping matches `#[cindy::remote]`: sync bodies
/// are pushed onto the blocking pool via `spawn_blocking`, async bodies
/// are awaited directly. The return value is run through
/// `serde_json::to_value` regardless of `V`, so the CLI never has to
/// know the user's vars type — it consumes
/// `Inventory<serde_json::Value>` and threads the opaque `vars` field
/// straight through to per-host `CINDY_HOST_CONTEXT` env vars.
///
/// ```ignore
/// #[cindy::inventory]
/// async fn inventory() -> cindy::Result<cindy::Inventory<VyosVars>> { ... }
/// ```
#[proc_macro_attribute]
pub fn inventory(_args: TokenStream, input: TokenStream) -> TokenStream {
    let input = parse_macro_input!(input as ItemFn);

    let (attrs, vis, sig, block) = (&input.attrs, &input.vis, &input.sig, &input.block);
    let function_ident = &sig.ident;
    let asyncness = &sig.asyncness;
    let is_async = asyncness.is_some();
    let output = &sig.output;
    let inputs = &sig.inputs;

    if !inputs.is_empty() {
        panic!("`#[cindy::inventory]` functions must take no arguments");
    }

    // The user's function body is invoked from a Send future spawned by
    // the inventory-dump branch in `#[cindy::main]`. For sync bodies we
    // shuttle through `spawn_blocking` so a slow inventory build
    // (TOML read, DB hit, etc.) doesn't stall the tokio reactor — same
    // pattern as `#[cindy::remote]`.
    let invoke = if is_async {
        quote! { #function_ident().await }
    } else {
        quote! {
            match cindy::__reexports::tokio::task::spawn_blocking(|| #function_ident())
                .await
            {
                Ok(v) => v,
                Err(je) => ::std::panic::resume_unwind(je.into_panic()),
            }
        }
    };

    quote! {
        #(#attrs)*
        #vis #asyncness fn #function_ident () #output #block

        // Register with the inventory crate so `#[cindy::main]` can find
        // and dump us. `IntoInventoryDump` normalises both
        // `Inventory<V>` and `Result<Inventory<V>, E>` return types
        // into a single on-the-wire shape (`{"hosts": [...]}`), with
        // fallible variants reporting their error to stderr and
        // exiting 1 — the CLI never has to peel a `{"Ok": ...}` /
        // `{"Err": ...}` wrapper.
        cindy::__reexports::inventory::submit! {
            cindy::inventory_types::RegisteredInventory {
                dump: || ::std::boxed::Box::pin(async move {
                    let result = #invoke;
                    <_ as cindy::inventory_types::IntoInventoryDump>::into_inventory_dump(result)
                }),
            }
        }
    }
    .into()
}

/// Marks a function as "running on the remote host".
///
/// Each `#[cindy::remote] fn foo(args) -> T` expands to a small bundle of
/// items at the same path:
///
/// 1. **`fn foo(args) -> cindy::orchestrator::Future<T>`** — the RPC shim
///    used by `#[cindy::main]` and other orchestrator-side code. Returns a
///    future that resolves once the worker has replied. (Lives in the
///    value namespace.)
/// 2. **`enum foo {} + impl foo { fn inner(args) -> T { ..user body.. } }`**
///    — the actual body, exposed at `foo::inner(args)`. Used inside other
///    `#[remote]` bodies on the worker to call siblings directly without
///    going through the RPC channel. The enum is uninhabited; it exists
///    purely so the path syntax `foo::inner` works in the type namespace,
///    side-by-side with `fn foo` in the value namespace. (Impl blocks
///    don't shift `super::` resolution, so user bodies that reference
///    `super::Foo` keep meaning what they meant before the macro ran.)
///
/// The inventory dispatcher (built on `feature = "remote"`) registers a
/// pointer to `foo::inner`. The body lives in a real function in every
/// cfg — including dual-feature LSP builds — so type-checking,
/// diagnostics, and goto-definition all work without any
/// `__body_typecheck` shim.
///
/// **Why does sibling-call syntax need to be `foo::inner(args)` instead
/// of just `foo(args)`?** Because in dual-feature LSP builds, the bare
/// `foo` symbol *must* be the orchestrator's RPC shim so `#[cindy::main]`
/// bodies type-check (`foo(args).await`). One Rust symbol cannot
/// simultaneously be `fn(..) -> Future<T>` and `fn(..) -> T` in the same
/// scope; the `::inner` path disambiguates without ceremony at the call
/// site.
#[proc_macro_attribute]
pub fn remote(_args: TokenStream, input: TokenStream) -> TokenStream {
    let input = parse_macro_input!(input as ItemFn);

    let (attrs, vis, sig, block) = (&input.attrs, &input.vis, &input.sig, &input.block);
    let function_ident = &sig.ident;
    let asyncness = &sig.asyncness;
    let is_async = asyncness.is_some();
    let inputs = &sig.inputs;
    // TODO: forbid generics
    // sig.generics.params.is_empty();
    let return_type = match &sig.output {
        syn::ReturnType::Default => quote! { () },
        syn::ReturnType::Type(_, ty) => quote! { #ty },
    };

    let mut arg_idents = vec![];
    let mut arg_types = vec![];
    for input_arg in &sig.inputs {
        match input_arg {
            FnArg::Receiver(..) => panic!("Argument `self` not allowed"),
            FnArg::Typed(pat_type) => match &*pat_type.pat {
                Pat::Ident(PatIdent { ident, .. }) => {
                    arg_idents.push(ident);
                    arg_types.push(&pat_type.ty)
                }
                other => panic!("Only standard named arguments are supported. Found: {other:?}"),
            },
        }
    }

    let type_signature = arg_types
        .iter()
        .map(|ty| quote! { #ty }.to_string().replace(' ', ""))
        .collect::<Vec<String>>()
        .join(",");
    let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_default();
    let absolute_span_path = std::path::PathBuf::from(proc_macro::Span::call_site().file());
    let relative_path = absolute_span_path
        .strip_prefix(&manifest_dir)
        .unwrap_or(&absolute_span_path)
        .to_string_lossy()
        .to_string();
    let crate_name = std::env::var("CARGO_PKG_NAME").unwrap_or_default();

    let remote_fn_id = format!(
        "::{}::{}::{}({})",
        crate_name, relative_path, function_ident, type_signature
    );

    // How the inventory wrapper invokes the user's body.
    //
    // - sync bodies are pushed onto the blocking pool so they don't stall the
    //   tokio reactor on slow syscalls.
    // - async bodies are awaited directly; the user is responsible for not
    //   doing blocking I/O inside them.
    //
    // For the sync path: if the user's body panics, `spawn_blocking`
    // surfaces it as a `JoinError`. We use `resume_unwind` (rather than a
    // simple `.expect`) so the *original* panic payload — message,
    // location, the lot — propagates up to the dispatcher's outer panic
    // catcher in `cindy::remote::rpc`, where it gets converted into a
    // `RemoteFnResponse::Panic(..)` carrying the real message.
    let invoke_user_fn = if is_async {
        quote! { #function_ident::inner( #(#arg_idents),* ).await }
    } else {
        quote! {
            match cindy::__reexports::tokio::task::spawn_blocking(move || {
                #function_ident::inner( #(#arg_idents),* )
            })
            .await
            {
                Ok(v) => v,
                Err(je) => ::std::panic::resume_unwind(je.into_panic()),
            }
        }
    };

    quote! {
        // The user's body, exposed at `#function_ident::inner`. Always
        // emitted regardless of feature flags so type-checking,
        // diagnostics, and goto-definition work in every cfg.
        //
        // The uninhabited `enum #function_ident` lives in the *type*
        // namespace; the orchestrator-side `fn #function_ident` further
        // down lives in the *value* namespace. The two coexist, which is
        // what makes `cmd(args).await` (RPC) and `cmd::inner(args)`
        // (local) work as different surface syntax at the same path.
        //
        // The `impl` block sits at the enclosing module's scope, so
        // `super::` paths in the user's signature and body resolve
        // exactly as they did pre-macro — no shift to work around.
        #[allow(non_camel_case_types)]
        #[doc(hidden)]
        #vis enum #function_ident {}

        #[doc(hidden)]
        impl #function_ident {
            /// This function can be called from another remote functions
            #(#attrs)*
            pub #asyncness fn inner (#inputs) -> #return_type #block
        }

        // Orchestrator-side RPC shim. `#[cindy::main]` users call this as
        // `function_ident(args).await` and get the worker's serialized
        // return value back.
        #[cfg(feature = "orchestrator")]
        #(#attrs)*
        #vis fn #function_ident (#inputs) -> cindy::orchestrator::Future<#return_type> {
            let uuid = cindy::__reexports::uuid::Uuid::new_v4();
            let payload = cindy::common::RemoteFnPayload {
                uuid,
                fn_id: #remote_fn_id.to_string(),
                data: cindy::__reexports::postcard::to_allocvec(&( #(#arg_idents),* ))
                    .expect("Failed to serialize args"),
            };
            let (tx, rx) = cindy::__reexports::tokio::sync::oneshot::channel();
            cindy::orchestrator::ORCHESTRATOR_TX
                .get()
                .expect("ORCHESTRATOR_TX not set")
                .send(cindy::orchestrator::OutboundRegistration { payload, tx })
                .expect("Orchestrator channel closed");
            cindy::orchestrator::Future::new(rx)
        }

        // Remote-side inventory registration. The dispatcher pulls this
        // entry out of the inventory by `fn_id` and invokes
        // `function_ident::inner` (via `spawn_blocking` for sync bodies so
        // long-running syscalls don't stall the tokio reactor).
        #[cfg(feature = "remote")]
        cindy::__reexports::inventory::submit! {
            cindy::remote::RemoteFn {
                id: #remote_fn_id,
                function: |args_bytes| {
                    let ( #(#arg_idents),* ): ( #(#arg_types),* ) =
                        cindy::__reexports::postcard::from_bytes(&args_bytes)
                        .expect("Failed to deserialize args");

                    ::std::boxed::Box::pin(async move {
                        let result = #invoke_user_fn;
                        cindy::__reexports::postcard::to_allocvec(&result)
                            .expect("Failed to serialize return value")
                    })
                },
            }
        }
    }
    .into()
}