Skip to main content

net/ffi/
aggregator.rs

1//! C FFI bindings for the aggregator-registry RPC client +
2//! fold-query client + channel-visibility setter
3//! (`SDK_AGGREGATOR_SUBNET_PLAN.md` stages 5 + 4-fold-query).
4//!
5//! Boundary conventions mirror `ffi::mesh`: opaque handles
6//! freed via dedicated `_free`, scalar ids as `u64`, JSON
7//! strings via `CString::into_raw` freed by the caller via
8//! `net_free_string`. Caller safety contract is identical to
9//! `ffi::mesh` / `ffi::cortex`; `clippy::missing_safety_doc`
10//! suppressed at the module level for the same rationale.
11#![allow(clippy::missing_safety_doc)]
12#![expect(
13    clippy::undocumented_unsafe_blocks,
14    reason = "module-wide FFI safety contract documented in ffi::mod.rs preamble"
15)]
16
17use std::ffi::{c_char, c_int, CStr, CString};
18use std::mem::ManuallyDrop;
19use std::time::Duration;
20
21use parking_lot::{Mutex as ParkingMutex, RwLock as ParkingRwLock};
22
23use super::handle_guard::{BeginFree, HandleGuard, FFI_HANDLE_FREE_DEADLINE};
24
25use crate::adapter::net::behavior::aggregator::{
26    FoldQueryClient, FoldQueryClientError, FoldQueryError, RegistryClient, RegistryClientError,
27    RegistryGroupSummary, RegistryRpcError, SummaryAnnouncement, DEFAULT_QUERY_DEADLINE,
28    DEFAULT_REGISTRY_DEADLINE,
29};
30use crate::adapter::net::{ChannelConfig, ChannelId, ChannelName, Visibility};
31
32use super::mesh::MeshNodeHandle;
33
34// ─── Error-kind discriminants (locked across SDKs) ───
35
36/// Server handler rejected: no summarizer registered under the
37/// requested fold kind. Only emitted by
38/// `net_fold_query_client_*` ops.
39pub const NET_REGISTRY_ERR_UNKNOWN_KIND: i32 = 7;
40
41/// `net_registry_client_*` op succeeded.
42pub const NET_REGISTRY_OK: i32 = 0;
43/// Transport-level failure (no route, timeout, server returned
44/// a non-Ok status before invoking the handler).
45pub const NET_REGISTRY_ERR_TRANSPORT: i32 = 1;
46/// Request serialization or response deserialization failed.
47pub const NET_REGISTRY_ERR_CODEC: i32 = 2;
48/// Server handler rejected: no template by that name.
49pub const NET_REGISTRY_ERR_UNKNOWN_TEMPLATE: i32 = 3;
50/// Server handler rejected: a group by that name is already
51/// registered.
52pub const NET_REGISTRY_ERR_DUPLICATE_GROUP_NAME: i32 = 4;
53/// Server handler rejected for a daemon-defined reason
54/// (config validation, replica spawn failed, etc.).
55pub const NET_REGISTRY_ERR_SPAWN_REJECTED: i32 = 5;
56/// Server doesn't accept dynamic spawn (read-only daemon).
57pub const NET_REGISTRY_ERR_SPAWN_NOT_SUPPORTED: i32 = 6;
58/// Server handler rejected `Scale`: no group by that name is
59/// registered on the target.
60pub const NET_REGISTRY_ERR_UNKNOWN_GROUP: i32 = 8;
61/// Server handler rejected `Scale` for a daemon-defined reason
62/// (template mismatch, replica spawn/stop failure, etc.).
63pub const NET_REGISTRY_ERR_SCALE_REJECTED: i32 = 9;
64/// Server doesn't accept dynamic scale (no scale handler
65/// installed).
66pub const NET_REGISTRY_ERR_SCALE_NOT_SUPPORTED: i32 = 10;
67/// Caller-side error: a string argument wasn't valid UTF-8 or
68/// a pointer was null where one was required.
69pub const NET_REGISTRY_ERR_INVALID_ARGS: i32 = 99;
70
71// ─── Visibility discriminants ───
72
73/// Wire-equivalent of [`Visibility`]. Values are
74/// representation-stable across SDK releases — operator code
75/// referring to them by literal value (not just by name) stays
76/// correct. Mirrors every substrate variant 1-to-1; mirror order
77/// is sorted by tier-broadness for operator readability.
78#[repr(i32)]
79#[derive(Copy, Clone)]
80pub enum NetVisibility {
81    /// Mirrors [`Visibility::Global`] — visible everywhere.
82    Global = 0,
83    /// Mirrors [`Visibility::ParentVisible`].
84    ParentVisible = 1,
85    /// Mirrors [`Visibility::Exported`] — explicit per-subnet export list.
86    Exported = 2,
87    /// Mirrors [`Visibility::SubnetLocal`] — packets never leave the subnet.
88    SubnetLocal = 3,
89}
90
91impl NetVisibility {
92    fn from_raw(raw: i32) -> Option<Visibility> {
93        match raw {
94            0 => Some(Visibility::Global),
95            1 => Some(Visibility::ParentVisible),
96            2 => Some(Visibility::Exported),
97            3 => Some(Visibility::SubnetLocal),
98            _ => None,
99        }
100    }
101
102    /// Compile-time exhaustiveness check in the *opposite*
103    /// direction — every substrate [`Visibility`] variant must
104    /// have a wire-stable C ABI counterpart. If the substrate
105    /// gains a variant, this `match` stops compiling, forcing
106    /// the FFI maintainer to either add the discriminant + bump
107    /// the wire contract or explicitly accept the omission with
108    /// `_ => None`. Without this, [`from_raw`] would silently
109    /// reject the new variant and operator code referring to it
110    /// by literal value would see a NULL handle / ERR_INVALID
111    /// instead of a typed wire error.
112    #[allow(dead_code)] // existence is the check
113    fn to_raw(v: Visibility) -> NetVisibility {
114        match v {
115            Visibility::Global => NetVisibility::Global,
116            Visibility::ParentVisible => NetVisibility::ParentVisible,
117            Visibility::Exported => NetVisibility::Exported,
118            Visibility::SubnetLocal => NetVisibility::SubnetLocal,
119        }
120    }
121}
122
123// ─── Handle ───
124
125/// FFI handle for a [`RegistryClient`].
126///
127/// The inner client is wrapped in a `RwLock` so concurrent ops
128/// (entry points are called from many threads in async runtimes)
129/// can share read access while a `set_deadline` writer
130/// serializes. `last_error_detail` lives behind a separate
131/// `parking_lot::Mutex`; [`net_registry_last_error_detail`]
132/// returns an owned copy of its contents (freed with
133/// `net_free_string`), so the returned pointer never aliases this
134/// mutex-owned slot and can't dangle on overwrite/free.
135///
136/// Uses the same `HandleGuard` quiescing recipe as the
137/// cortex/mesh/redis-dedup handles (see [`super::handle_guard`]):
138/// the inner fields live in `ManuallyDrop`, every op gates on
139/// `guard.try_enter()`, and `_free` drains in-flight ops via
140/// `begin_free()` before dropping the inner — the box itself is
141/// leaked, never `Box::from_raw`'d, so a `_free` racing a
142/// concurrent op can't deallocate the lock out from under it.
143pub struct RegistryClientHandle {
144    client: ManuallyDrop<ParkingRwLock<RegistryClient>>,
145    last_error_detail: ManuallyDrop<ParkingMutex<Option<CString>>>,
146    guard: HandleGuard,
147}
148
149// ─── Constructor / free / builder ───
150
151/// Construct a `RegistryClient` against an existing
152/// [`MeshNodeHandle`]. Returns a handle the caller frees via
153/// [`net_registry_client_free`]. Returns NULL on null input.
154#[unsafe(no_mangle)]
155pub unsafe extern "C" fn net_registry_client_new(
156    mesh_handle: *mut MeshNodeHandle,
157) -> *mut RegistryClientHandle {
158    if mesh_handle.is_null() {
159        return std::ptr::null_mut();
160    }
161    // Gated clone of the mesh node — `None` means the mesh handle is
162    // being freed concurrently; surface a null handle rather than
163    // racing the inner out of `ManuallyDrop`.
164    let Some(mesh_arc) = (unsafe { super::mesh::mesh_node_arc(&*mesh_handle) }) else {
165        return std::ptr::null_mut();
166    };
167    let boxed = Box::new(RegistryClientHandle {
168        client: ManuallyDrop::new(ParkingRwLock::new(RegistryClient::new(mesh_arc))),
169        last_error_detail: ManuallyDrop::new(ParkingMutex::new(None)),
170        guard: HandleGuard::new(),
171    });
172    Box::into_raw(boxed)
173}
174
175/// Free a `RegistryClient` handle produced by
176/// [`net_registry_client_new`]. Idempotent on NULL and on a
177/// second call (the `begin_free` single-winner contract gates the
178/// inner drop). Quiesces in-flight ops before dropping the inner;
179/// the box stays leaked so a concurrent op can't UAF the handle.
180#[unsafe(no_mangle)]
181pub unsafe extern "C" fn net_registry_client_free(handle: *mut RegistryClientHandle) {
182    if handle.is_null() {
183        return;
184    }
185    let h: &RegistryClientHandle = unsafe { &*handle };
186    match h.guard.begin_free_detailed(FFI_HANDLE_FREE_DEADLINE) {
187        BeginFree::Drained => {
188            // SAFETY: drained; sole writable reference. Box leaked.
189            unsafe {
190                ManuallyDrop::drop(&mut (*handle).client);
191                ManuallyDrop::drop(&mut (*handle).last_error_detail);
192            }
193        }
194        // Benign repeat free — a prior call owns the inner; nothing
195        // to do and nothing leaked by this call.
196        BeginFree::AlreadyFreeing => {}
197        BeginFree::TimedOut => {
198            tracing::warn!(
199                "net_registry_client_free: in-flight ops did not drain within deadline; \
200                 leaking inner to avoid use-after-free"
201            );
202        }
203    }
204}
205
206/// Override the per-call deadline in milliseconds. `millis == 0`
207/// resets to the substrate default. Safe to call concurrently
208/// with in-flight ops; the writer takes the inner lock briefly
209/// and any concurrent reader either observes the old or the new
210/// deadline (no torn read).
211#[unsafe(no_mangle)]
212pub unsafe extern "C" fn net_registry_client_set_deadline(
213    handle: *mut RegistryClientHandle,
214    millis: u64,
215) {
216    if handle.is_null() {
217        return;
218    }
219    let h: &RegistryClientHandle = unsafe { &*handle };
220    let _op = match h.guard.try_enter() {
221        Some(op) => op,
222        None => return,
223    };
224    let deadline = if millis == 0 {
225        DEFAULT_REGISTRY_DEADLINE
226    } else {
227        Duration::from_millis(millis)
228    };
229    h.client.write().set_deadline_mut(deadline);
230}
231
232// ─── Op-handler internals ───
233//
234// Every public `net_registry_client_*` op shares the same six
235// steps: null-check, parse CStr args, snapshot the client under
236// the read lock, await the substrate call, classify+store-detail
237// on error, write the out param. The `dispatch_*` + `write_*`
238// helpers below capture each step once.
239
240/// Set `*out` if non-null and return the JSON pointer + status.
241/// Op handlers funnel every success / failure path through this
242/// so the null-check on `out_error_kind` is centralized.
243#[inline]
244unsafe fn write_kind(out: *mut c_int, kind: c_int) {
245    if !out.is_null() {
246        unsafe { *out = kind };
247    }
248}
249
250/// Read a NUL-terminated UTF-8 string argument and return an
251/// owned `String`, or set the out-param to `INVALID_ARGS` +
252/// return `None` if the pointer is null or the bytes aren't
253/// valid UTF-8.
254#[inline]
255unsafe fn cstr_arg(ptr: *const c_char, out: *mut c_int) -> Option<String> {
256    if ptr.is_null() {
257        unsafe { write_kind(out, NET_REGISTRY_ERR_INVALID_ARGS) };
258        return None;
259    }
260    match unsafe { CStr::from_ptr(ptr).to_str() } {
261        Ok(s) => Some(s.to_owned()),
262        Err(_) => {
263            unsafe { write_kind(out, NET_REGISTRY_ERR_INVALID_ARGS) };
264            None
265        }
266    }
267}
268
269/// Convert a JSON string into a heap-allocated `*mut c_char` the
270/// caller frees with `net_free_string`. Returns NULL + sets the
271/// out-param to `CODEC` if the string contains an embedded NUL.
272#[inline]
273unsafe fn json_to_raw(json: String, out: *mut c_int) -> *mut c_char {
274    match CString::new(json) {
275        Ok(s) => {
276            unsafe { write_kind(out, NET_REGISTRY_OK) };
277            s.into_raw()
278        }
279        Err(_) => {
280            unsafe { write_kind(out, NET_REGISTRY_ERR_CODEC) };
281            std::ptr::null_mut()
282        }
283    }
284}
285
286/// Funnel for any registry op that returns a JSON string.
287/// Takes a closure that produces `Result<String, RegistryClientError>`
288/// (the JSON-encoding step is the caller's responsibility because
289/// the substrate type varies per op).
290unsafe fn registry_op_json<F>(
291    handle: *mut RegistryClientHandle,
292    out_error_kind: *mut c_int,
293    op: F,
294) -> *mut c_char
295where
296    F: FnOnce(RegistryClient) -> Result<String, RegistryClientError>,
297{
298    if handle.is_null() {
299        unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
300        return std::ptr::null_mut();
301    }
302    let h: &RegistryClientHandle = unsafe { &*handle };
303    // Hold the guard ONLY long enough to clone the inner client (an
304    // Arc-backed handle that keeps the mesh node alive on its own).
305    // Bail with INVALID_ARGS (same shape as a NULL handle) if `_free`
306    // has begun. Dropping the guard before the blocking RPC means a
307    // concurrent `_free` never waits on the (caller-settable, possibly
308    // multi-second) op deadline — so it can't time out and leak the
309    // inner; the op simply completes against its own clone.
310    let client = match h.guard.try_enter() {
311        Some(_op) => h.client.read().clone(),
312        None => {
313            unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
314            return std::ptr::null_mut();
315        }
316    };
317    match op(client) {
318        Ok(json) => unsafe { json_to_raw(json, out_error_kind) },
319        Err(e) => {
320            let (kind, detail) = classify(&e);
321            // Re-enter only to record the detail; if the handle is now
322            // being freed, drop it (a freed handle won't be queried).
323            if let Some(_op) = h.guard.try_enter() {
324                store_error_detail(h, detail);
325            }
326            unsafe { write_kind(out_error_kind, kind) };
327            std::ptr::null_mut()
328        }
329    }
330}
331
332// ─── Operations ───
333
334/// Enumerate groups on `target_node_id`. Returns a JSON-encoded
335/// `[RegistryGroupSummaryJson]` string the caller frees via
336/// `net_free_string`. On error, writes the error kind to
337/// `*out_error_kind` and returns NULL.
338#[unsafe(no_mangle)]
339pub unsafe extern "C" fn net_registry_client_list(
340    handle: *mut RegistryClientHandle,
341    target_node_id: u64,
342    out_error_kind: *mut c_int,
343) -> *mut c_char {
344    if out_error_kind.is_null() {
345        return std::ptr::null_mut();
346    }
347    unsafe {
348        registry_op_json(handle, out_error_kind, |client| {
349            block_on(client.list(target_node_id)).map(|groups| groups_to_json(&groups))
350        })
351    }
352}
353
354/// Spawn a new group by referencing a daemon-side template.
355/// `template_name` + `group_name` are NUL-terminated UTF-8.
356#[unsafe(no_mangle)]
357pub unsafe extern "C" fn net_registry_client_spawn(
358    handle: *mut RegistryClientHandle,
359    target_node_id: u64,
360    template_name: *const c_char,
361    group_name: *const c_char,
362    replica_count: u8,
363    out_error_kind: *mut c_int,
364) -> *mut c_char {
365    let Some(template) = (unsafe { cstr_arg(template_name, out_error_kind) }) else {
366        return std::ptr::null_mut();
367    };
368    let Some(group) = (unsafe { cstr_arg(group_name, out_error_kind) }) else {
369        return std::ptr::null_mut();
370    };
371    unsafe {
372        registry_op_json(handle, out_error_kind, |client| {
373            block_on(client.spawn(target_node_id, template, group, replica_count))
374                .map(|summary| group_to_json(&summary))
375        })
376    }
377}
378
379/// Tear down a registered group by name. Returns `1` when the
380/// group existed and was stopped, `0` when no such group was
381/// registered, `-1` on transport / codec / invalid-args
382/// failure (consult `out_error_kind`).
383#[unsafe(no_mangle)]
384pub unsafe extern "C" fn net_registry_client_unregister(
385    handle: *mut RegistryClientHandle,
386    target_node_id: u64,
387    group_name: *const c_char,
388    out_error_kind: *mut c_int,
389) -> c_int {
390    if handle.is_null() {
391        unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
392        return -1;
393    }
394    let Some(group) = (unsafe { cstr_arg(group_name, out_error_kind) }) else {
395        return -1;
396    };
397    let h: &RegistryClientHandle = unsafe { &*handle };
398    // Guard held only for the clone — see `registry_op_json` for why
399    // the blocking RPC runs unguarded.
400    let client = match h.guard.try_enter() {
401        Some(_op) => h.client.read().clone(),
402        None => {
403            unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
404            return -1;
405        }
406    };
407    match block_on(client.unregister(target_node_id, group)) {
408        Ok(existed) => {
409            unsafe { write_kind(out_error_kind, NET_REGISTRY_OK) };
410            if existed {
411                1
412            } else {
413                0
414            }
415        }
416        Err(e) => {
417            let (kind, detail) = classify(&e);
418            if let Some(_op) = h.guard.try_enter() {
419                store_error_detail(h, detail);
420            }
421            unsafe { write_kind(out_error_kind, kind) };
422            -1
423        }
424    }
425}
426
427/// Get the operator-facing detail string for the most recent
428/// non-OK op on this handle. Returns a freshly-allocated,
429/// NUL-terminated C string that the **caller owns and must free
430/// with `net_free_string`**. Returns NULL when no error has been
431/// recorded.
432///
433/// An owned copy (rather than a borrow into the handle) is
434/// deliberate: a borrowed pointer into the handle's
435/// `Mutex`-owned `CString` would dangle the moment a concurrent
436/// op overwrote the slot or `_free` dropped the inner — a
437/// use-after-free under the multi-threaded usage this handle
438/// advertises. We snapshot under the lock and hand back an
439/// independent allocation, so the returned pointer's lifetime is
440/// the caller's alone.
441#[unsafe(no_mangle)]
442pub unsafe extern "C" fn net_registry_last_error_detail(
443    handle: *mut RegistryClientHandle,
444) -> *mut c_char {
445    if handle.is_null() {
446        return std::ptr::null_mut();
447    }
448    let h: &RegistryClientHandle = unsafe { &*handle };
449    let _op = match h.guard.try_enter() {
450        Some(op) => op,
451        None => return std::ptr::null_mut(),
452    };
453    let guard = h.last_error_detail.lock();
454    match guard.as_ref() {
455        // Clone the contents out from under the lock into a fresh
456        // allocation the caller frees with `net_free_string`.
457        Some(c) => c.clone().into_raw(),
458        None => std::ptr::null_mut(),
459    }
460}
461
462// ─── Visibility setter ───
463
464/// Register a channel with a specific [`Visibility`] tier.
465/// Mirrors `Mesh::register_channel` from the Rust SDK at the C
466/// boundary. `visibility` is an [`i32`] matching the
467/// [`NetVisibility`] discriminants.
468///
469/// Returns `NET_REGISTRY_OK` on success or a typed error code.
470/// Operator-facing detail (e.g. "invalid channel name") is
471/// written to a side-channel: the substrate logs via `tracing`
472/// — no per-call detail string is allocated at this layer.
473#[unsafe(no_mangle)]
474pub unsafe extern "C" fn net_register_channel(
475    mesh_handle: *mut MeshNodeHandle,
476    name: *const c_char,
477    visibility: c_int,
478) -> c_int {
479    if mesh_handle.is_null() || name.is_null() {
480        return NET_REGISTRY_ERR_INVALID_ARGS;
481    }
482    let vis = match NetVisibility::from_raw(visibility) {
483        Some(v) => v,
484        None => return NET_REGISTRY_ERR_INVALID_ARGS,
485    };
486    let name_str = match unsafe { CStr::from_ptr(name).to_str() } {
487        Ok(s) => s,
488        Err(_) => return NET_REGISTRY_ERR_INVALID_ARGS,
489    };
490    let channel = match ChannelName::new(name_str) {
491        Ok(c) => c,
492        Err(_) => return NET_REGISTRY_ERR_INVALID_ARGS,
493    };
494    // Use the mesh's installed ChannelConfigRegistry. The
495    // mesh-FFI's net_mesh_new always installs one, so this is
496    // safe; if it ever changes, the registry being `None` is
497    // surfaced as NET_REGISTRY_ERR_INVALID_ARGS.
498    let Some(mesh_arc) = (unsafe { super::mesh::mesh_node_arc(&*mesh_handle) }) else {
499        return NET_REGISTRY_ERR_INVALID_ARGS;
500    };
501    let Some(configs) = mesh_arc.channel_configs() else {
502        return NET_REGISTRY_ERR_INVALID_ARGS;
503    };
504    let cfg = ChannelConfig::new(ChannelId::new(channel)).with_visibility(vis);
505    configs.insert(cfg);
506    NET_REGISTRY_OK
507}
508
509// ─── FoldQueryClient handle ───
510
511/// FFI handle for a [`FoldQueryClient`]. Same sync model as
512/// [`RegistryClientHandle`]: the inner client lives behind a
513/// `RwLock` so `set_ttl` / `set_deadline` writers serialize with
514/// in-flight ops, and the cache (held by the inner client's
515/// `Arc<RwLock<HashMap<...>>>`) survives deadline / TTL changes.
516///
517/// Same `HandleGuard` quiescing recipe as
518/// [`RegistryClientHandle`].
519pub struct FoldQueryClientHandle {
520    client: ManuallyDrop<ParkingRwLock<FoldQueryClient>>,
521    last_error_detail: ManuallyDrop<ParkingMutex<Option<CString>>>,
522    guard: HandleGuard,
523}
524
525/// Construct a `FoldQueryClient` against an existing
526/// [`MeshNodeHandle`]. Returns a handle the caller frees via
527/// [`net_fold_query_client_free`]. Returns NULL on null input.
528#[unsafe(no_mangle)]
529pub unsafe extern "C" fn net_fold_query_client_new(
530    mesh_handle: *mut MeshNodeHandle,
531) -> *mut FoldQueryClientHandle {
532    if mesh_handle.is_null() {
533        return std::ptr::null_mut();
534    }
535    let Some(mesh_arc) = (unsafe { super::mesh::mesh_node_arc(&*mesh_handle) }) else {
536        return std::ptr::null_mut();
537    };
538    let boxed = Box::new(FoldQueryClientHandle {
539        client: ManuallyDrop::new(ParkingRwLock::new(FoldQueryClient::new(mesh_arc))),
540        last_error_detail: ManuallyDrop::new(ParkingMutex::new(None)),
541        guard: HandleGuard::new(),
542    });
543    Box::into_raw(boxed)
544}
545
546/// Free a `FoldQueryClient` handle. Idempotent on NULL and on a
547/// second call. Quiesces in-flight ops before dropping the inner;
548/// the box stays leaked so a concurrent op can't UAF the handle.
549#[unsafe(no_mangle)]
550pub unsafe extern "C" fn net_fold_query_client_free(handle: *mut FoldQueryClientHandle) {
551    if handle.is_null() {
552        return;
553    }
554    let h: &FoldQueryClientHandle = unsafe { &*handle };
555    match h.guard.begin_free_detailed(FFI_HANDLE_FREE_DEADLINE) {
556        BeginFree::Drained => {
557            // SAFETY: drained; sole writable reference. Box leaked.
558            unsafe {
559                ManuallyDrop::drop(&mut (*handle).client);
560                ManuallyDrop::drop(&mut (*handle).last_error_detail);
561            }
562        }
563        // Benign repeat free — a prior call owns the inner; nothing
564        // to do and nothing leaked by this call.
565        BeginFree::AlreadyFreeing => {}
566        BeginFree::TimedOut => {
567            tracing::warn!(
568                "net_fold_query_client_free: in-flight ops did not drain within deadline; \
569                 leaking inner to avoid use-after-free"
570            );
571        }
572    }
573}
574
575/// Override the cache TTL in milliseconds. `millis == 0` disables
576/// the cache entirely. Mutates in place — the warmed cache
577/// survives the adjustment.
578#[unsafe(no_mangle)]
579pub unsafe extern "C" fn net_fold_query_client_set_ttl(
580    handle: *mut FoldQueryClientHandle,
581    millis: u64,
582) {
583    if handle.is_null() {
584        return;
585    }
586    let h: &FoldQueryClientHandle = unsafe { &*handle };
587    let _op = match h.guard.try_enter() {
588        Some(op) => op,
589        None => return,
590    };
591    h.client.write().set_ttl_mut(Duration::from_millis(millis));
592}
593
594/// Override the per-call deadline in milliseconds. `millis == 0`
595/// resets to the substrate default. Mutates in place.
596#[unsafe(no_mangle)]
597pub unsafe extern "C" fn net_fold_query_client_set_deadline(
598    handle: *mut FoldQueryClientHandle,
599    millis: u64,
600) {
601    if handle.is_null() {
602        return;
603    }
604    let h: &FoldQueryClientHandle = unsafe { &*handle };
605    let _op = match h.guard.try_enter() {
606        Some(op) => op,
607        None => return,
608    };
609    let deadline = if millis == 0 {
610        DEFAULT_QUERY_DEADLINE
611    } else {
612        Duration::from_millis(millis)
613    };
614    h.client.write().set_deadline_mut(deadline);
615}
616
617/// Query the aggregator's latest cached summaries. Cache hit
618/// returns immediately; miss issues a wire RPC, caches the
619/// response, and returns. Returns a JSON-encoded
620/// `[SummaryAnnouncementJson]` string the caller frees via
621/// `net_free_string`.
622#[unsafe(no_mangle)]
623pub unsafe extern "C" fn net_fold_query_client_query_latest(
624    handle: *mut FoldQueryClientHandle,
625    target_node_id: u64,
626    kind: u16,
627    out_error_kind: *mut c_int,
628) -> *mut c_char {
629    if out_error_kind.is_null() {
630        return std::ptr::null_mut();
631    }
632    unsafe {
633        fold_query_op_json(handle, out_error_kind, |client| {
634            block_on(client.query_latest(target_node_id, kind))
635                .map(|summaries| summaries_to_json(&summaries))
636        })
637    }
638}
639
640/// Force a fresh `SummarizeNow` query — never cached.
641#[unsafe(no_mangle)]
642pub unsafe extern "C" fn net_fold_query_client_query_summarize_now(
643    handle: *mut FoldQueryClientHandle,
644    target_node_id: u64,
645    kind: u16,
646    out_error_kind: *mut c_int,
647) -> *mut c_char {
648    if out_error_kind.is_null() {
649        return std::ptr::null_mut();
650    }
651    unsafe {
652        fold_query_op_json(handle, out_error_kind, |client| {
653            block_on(client.query_summarize_now(target_node_id, kind))
654                .map(|summaries| summaries_to_json(&summaries))
655        })
656    }
657}
658
659/// Drop every cached entry.
660#[unsafe(no_mangle)]
661pub unsafe extern "C" fn net_fold_query_client_invalidate_cache(
662    handle: *mut FoldQueryClientHandle,
663) {
664    if handle.is_null() {
665        return;
666    }
667    let h: &FoldQueryClientHandle = unsafe { &*handle };
668    let _op = match h.guard.try_enter() {
669        Some(op) => op,
670        None => return,
671    };
672    h.client.read().invalidate_cache();
673}
674
675/// Drop only cache entries matching `target_node_id`.
676#[unsafe(no_mangle)]
677pub unsafe extern "C" fn net_fold_query_client_invalidate_target(
678    handle: *mut FoldQueryClientHandle,
679    target_node_id: u64,
680) {
681    if handle.is_null() {
682        return;
683    }
684    let h: &FoldQueryClientHandle = unsafe { &*handle };
685    let _op = match h.guard.try_enter() {
686        Some(op) => op,
687        None => return,
688    };
689    h.client.read().invalidate_target(target_node_id);
690}
691
692/// Operator-facing detail string for the most recent non-OK
693/// fold-query op. Returns a freshly-allocated, caller-owned C
694/// string to be freed with `net_free_string` (NULL if no error
695/// recorded). Same owned-copy rationale as
696/// [`net_registry_last_error_detail`] — never a borrow into the
697/// handle, which would dangle on a concurrent overwrite/free.
698#[unsafe(no_mangle)]
699pub unsafe extern "C" fn net_fold_query_last_error_detail(
700    handle: *mut FoldQueryClientHandle,
701) -> *mut c_char {
702    if handle.is_null() {
703        return std::ptr::null_mut();
704    }
705    let h: &FoldQueryClientHandle = unsafe { &*handle };
706    let _op = match h.guard.try_enter() {
707        Some(op) => op,
708        None => return std::ptr::null_mut(),
709    };
710    let guard = h.last_error_detail.lock();
711    match guard.as_ref() {
712        Some(c) => c.clone().into_raw(),
713        None => std::ptr::null_mut(),
714    }
715}
716
717// ─── Internals ───
718
719/// Run a future to completion on the shared mesh-FFI tokio
720/// runtime. Same as `ffi::mesh::block_on` — re-uses that
721/// runtime so we don't fragment scheduling.
722fn block_on<F: std::future::Future>(future: F) -> F::Output {
723    super::mesh::block_on(future)
724}
725
726/// Funnel for any fold-query op that returns a JSON string.
727/// Mirror of [`registry_op_json`].
728unsafe fn fold_query_op_json<F>(
729    handle: *mut FoldQueryClientHandle,
730    out_error_kind: *mut c_int,
731    op: F,
732) -> *mut c_char
733where
734    F: FnOnce(FoldQueryClient) -> Result<String, FoldQueryClientError>,
735{
736    if handle.is_null() {
737        unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
738        return std::ptr::null_mut();
739    }
740    let h: &FoldQueryClientHandle = unsafe { &*handle };
741    // Guard held only for the clone — see `registry_op_json` for why
742    // the blocking RPC runs unguarded (so a long op deadline can't
743    // make a concurrent `_free` time out and leak the inner).
744    let client = match h.guard.try_enter() {
745        Some(_op) => h.client.read().clone(),
746        None => {
747            unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
748            return std::ptr::null_mut();
749        }
750    };
751    match op(client) {
752        Ok(json) => unsafe { json_to_raw(json, out_error_kind) },
753        Err(e) => {
754            let (kind, detail) = classify_fold_query(&e);
755            if let Some(_op) = h.guard.try_enter() {
756                store_fold_query_error_detail(h, detail);
757            }
758            unsafe { write_kind(out_error_kind, kind) };
759            std::ptr::null_mut()
760        }
761    }
762}
763
764fn classify_fold_query(err: &FoldQueryClientError) -> (i32, String) {
765    match err {
766        FoldQueryClientError::Transport(e) => (NET_REGISTRY_ERR_TRANSPORT, format!("{e}")),
767        FoldQueryClientError::Codec(c) => (NET_REGISTRY_ERR_CODEC, c.clone()),
768        FoldQueryClientError::Server(FoldQueryError::UnknownKind { kind }) => (
769            NET_REGISTRY_ERR_UNKNOWN_KIND,
770            format!("unknown fold kind: 0x{kind:04x}"),
771        ),
772        FoldQueryClientError::Server(FoldQueryError::DecodeFailed(s)) => {
773            (NET_REGISTRY_ERR_CODEC, format!("server decode: {s}"))
774        }
775    }
776}
777
778fn store_fold_query_error_detail(h: &FoldQueryClientHandle, detail: String) {
779    let c = match CString::new(detail) {
780        Ok(c) => c,
781        Err(_) => CString::new("invalid utf-8 in error detail").unwrap_or_default(),
782    };
783    *h.last_error_detail.lock() = Some(c);
784}
785
786fn summaries_to_json(summaries: &[SummaryAnnouncement]) -> String {
787    let wire: Vec<SummaryWire<'_>> = summaries.iter().map(SummaryWire::from).collect();
788    // `to_string` only fails on serializer-side issues — none of
789    // our wire types have non-string map keys or Float NaN — so
790    // the unwrap is unreachable. Defensive `to_string`-on-error
791    // keeps the FFI surface infallible.
792    serde_json::to_string(&wire).unwrap_or_else(|_| "[]".to_string())
793}
794
795#[cfg(test)]
796fn summary_to_json(s: &SummaryAnnouncement) -> String {
797    serde_json::to_string(&SummaryWire::from(s)).unwrap_or_else(|_| "{}".to_string())
798}
799
800#[derive(serde::Serialize)]
801struct SummaryWire<'a> {
802    fold_kind: u16,
803    source_subnet: String,
804    generation: u64,
805    buckets: Vec<BucketWire<'a>>,
806}
807
808#[derive(serde::Serialize)]
809struct BucketWire<'a> {
810    name: &'a str,
811    count: u64,
812}
813
814impl<'a> From<&'a SummaryAnnouncement> for SummaryWire<'a> {
815    fn from(s: &'a SummaryAnnouncement) -> Self {
816        Self {
817            fold_kind: s.fold_kind,
818            source_subnet: format!("{}", s.source_subnet),
819            generation: s.generation,
820            buckets: s
821                .buckets
822                .iter()
823                .map(|(n, c)| BucketWire {
824                    name: n.as_str(),
825                    count: *c,
826                })
827                .collect(),
828        }
829    }
830}
831
832/// Map a `RegistryClientError` to `(error_kind, detail_string)`.
833fn classify(err: &RegistryClientError) -> (i32, String) {
834    match err {
835        RegistryClientError::Transport(e) => (NET_REGISTRY_ERR_TRANSPORT, format!("{e}")),
836        RegistryClientError::Codec(c) => (NET_REGISTRY_ERR_CODEC, c.clone()),
837        RegistryClientError::Server(RegistryRpcError::DecodeFailed(s)) => {
838            (NET_REGISTRY_ERR_CODEC, format!("server decode: {s}"))
839        }
840        RegistryClientError::Server(RegistryRpcError::UnknownTemplate(t)) => (
841            NET_REGISTRY_ERR_UNKNOWN_TEMPLATE,
842            format!("unknown template: {t}"),
843        ),
844        RegistryClientError::Server(RegistryRpcError::DuplicateGroupName(n)) => (
845            NET_REGISTRY_ERR_DUPLICATE_GROUP_NAME,
846            format!("duplicate group name: {n}"),
847        ),
848        RegistryClientError::Server(RegistryRpcError::SpawnRejected(d)) => (
849            NET_REGISTRY_ERR_SPAWN_REJECTED,
850            format!("spawn rejected: {d}"),
851        ),
852        RegistryClientError::Server(RegistryRpcError::SpawnNotSupported) => (
853            NET_REGISTRY_ERR_SPAWN_NOT_SUPPORTED,
854            "daemon is read-only (no spawn handler installed)".to_string(),
855        ),
856        RegistryClientError::Server(RegistryRpcError::UnknownGroup(g)) => (
857            NET_REGISTRY_ERR_UNKNOWN_GROUP,
858            format!("unknown group: {g}"),
859        ),
860        RegistryClientError::Server(RegistryRpcError::ScaleRejected(d)) => (
861            NET_REGISTRY_ERR_SCALE_REJECTED,
862            format!("scale rejected: {d}"),
863        ),
864        RegistryClientError::Server(RegistryRpcError::ScaleNotSupported) => (
865            NET_REGISTRY_ERR_SCALE_NOT_SUPPORTED,
866            "daemon doesn't accept dynamic scale (no scaler installed)".to_string(),
867        ),
868    }
869}
870
871fn store_error_detail(h: &RegistryClientHandle, detail: String) {
872    let c = match CString::new(detail) {
873        Ok(c) => c,
874        Err(_) => CString::new("invalid utf-8 in error detail").unwrap_or_default(),
875    };
876    *h.last_error_detail.lock() = Some(c);
877}
878
879/// Encode the wire-contract JSON for a slice of registry-group
880/// summaries via `serde_json`. The substrate type
881/// `RegistryGroupSummary` derives `Serialize` but its
882/// `group_seed: [u8; 32]` field serializes as an array of u8 —
883/// the wire contract calls for `group_seed_hex: "abab…"` (64
884/// lowercase hex chars). The proxy wire-types below handle the
885/// rename + hex encoding.
886fn groups_to_json(groups: &[RegistryGroupSummary]) -> String {
887    let wire: Vec<GroupWire<'_>> = groups.iter().map(GroupWire::from).collect();
888    serde_json::to_string(&wire).unwrap_or_else(|_| "[]".to_string())
889}
890
891fn group_to_json(g: &RegistryGroupSummary) -> String {
892    serde_json::to_string(&GroupWire::from(g)).unwrap_or_else(|_| "{}".to_string())
893}
894
895#[derive(serde::Serialize)]
896struct GroupWire<'a> {
897    name: &'a str,
898    group_seed_hex: String,
899    replicas: Vec<ReplicaWire<'a>>,
900}
901
902#[derive(serde::Serialize)]
903struct ReplicaWire<'a> {
904    generation: u64,
905    healthy: bool,
906    diagnostic: Option<&'a str>,
907    placement_node_id: Option<u64>,
908}
909
910impl<'a> From<&'a RegistryGroupSummary> for GroupWire<'a> {
911    fn from(g: &'a RegistryGroupSummary) -> Self {
912        Self {
913            name: g.name.as_str(),
914            group_seed_hex: hex::encode(g.group_seed),
915            replicas: g
916                .replicas
917                .iter()
918                .map(|r| ReplicaWire {
919                    generation: r.generation,
920                    healthy: r.healthy,
921                    diagnostic: r.diagnostic.as_deref(),
922                    placement_node_id: r.placement_node_id,
923                })
924                .collect(),
925        }
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    use super::*;
932
933    #[test]
934    fn visibility_round_trips_through_raw() {
935        for (raw, expected) in [
936            (0, Visibility::Global),
937            (1, Visibility::ParentVisible),
938            (2, Visibility::Exported),
939            (3, Visibility::SubnetLocal),
940        ] {
941            let back = NetVisibility::from_raw(raw).expect("known discriminant");
942            assert_eq!(format!("{back:?}"), format!("{expected:?}"));
943        }
944        assert!(NetVisibility::from_raw(99).is_none());
945        assert!(NetVisibility::from_raw(-1).is_none());
946    }
947
948    #[test]
949    fn group_to_json_includes_every_documented_field() {
950        let g = RegistryGroupSummary {
951            name: "alpha".into(),
952            group_seed: [0xABu8; 32],
953            source_subnet: crate::adapter::net::subnet::SubnetId::GLOBAL,
954            fold_kinds: vec![0x0001],
955            replicas: vec![
956                crate::adapter::net::behavior::aggregator::RegistryReplicaSummary {
957                    generation: 42,
958                    healthy: true,
959                    diagnostic: None,
960                    placement_node_id: Some(0xBEEF),
961                },
962                crate::adapter::net::behavior::aggregator::RegistryReplicaSummary {
963                    generation: 0,
964                    healthy: false,
965                    diagnostic: Some("stuck".into()),
966                    placement_node_id: None,
967                },
968            ],
969        };
970        let json = group_to_json(&g);
971        assert!(json.contains("\"name\":\"alpha\""));
972        // Each byte 0xAB → "ab"; 32 of them = 64 hex chars
973        // alternating "ab".
974        assert!(json.contains("\"group_seed_hex\":\"abababababababababababababababababababababababababababababababab\""));
975        assert!(json.contains("\"generation\":42"));
976        assert!(json.contains("\"healthy\":true"));
977        assert!(json.contains("\"diagnostic\":null"));
978        assert!(json.contains("\"placement_node_id\":48879"));
979        assert!(json.contains("\"healthy\":false"));
980        assert!(json.contains("\"diagnostic\":\"stuck\""));
981        assert!(json.contains("\"placement_node_id\":null"));
982    }
983
984    #[test]
985    fn summary_to_json_includes_every_documented_field() {
986        let s = SummaryAnnouncement {
987            fold_kind: 0x42,
988            source_subnet: crate::adapter::net::subnet::SubnetId::GLOBAL,
989            generation: 7,
990            buckets: vec![("alpha".into(), 1), ("beta".into(), 2)],
991        };
992        let json = summary_to_json(&s);
993        assert!(json.contains("\"fold_kind\":66"));
994        assert!(json.contains("\"source_subnet\":\"global\""));
995        assert!(json.contains("\"generation\":7"));
996        assert!(json.contains("\"name\":\"alpha\""));
997        assert!(json.contains("\"count\":1"));
998        assert!(json.contains("\"name\":\"beta\""));
999        assert!(json.contains("\"count\":2"));
1000    }
1001
1002    #[test]
1003    fn classify_fold_query_maps_every_variant() {
1004        use crate::adapter::net::mesh_rpc::RpcError;
1005        // Transport — anything carrying an RpcError lands on
1006        // NET_REGISTRY_ERR_TRANSPORT regardless of the inner kind.
1007        let transport = FoldQueryClientError::Transport(RpcError::NoRoute {
1008            target: 0,
1009            reason: String::new(),
1010        });
1011        assert_eq!(
1012            classify_fold_query(&transport).0,
1013            NET_REGISTRY_ERR_TRANSPORT
1014        );
1015
1016        let codec = FoldQueryClientError::Codec("bad".into());
1017        assert_eq!(classify_fold_query(&codec).0, NET_REGISTRY_ERR_CODEC);
1018
1019        let unknown_kind = FoldQueryClientError::Server(FoldQueryError::UnknownKind { kind: 0x42 });
1020        let (kind_code, detail) = classify_fold_query(&unknown_kind);
1021        assert_eq!(kind_code, NET_REGISTRY_ERR_UNKNOWN_KIND);
1022        assert!(detail.contains("0x0042"));
1023
1024        let decode_failed =
1025            FoldQueryClientError::Server(FoldQueryError::DecodeFailed("boom".into()));
1026        assert_eq!(
1027            classify_fold_query(&decode_failed).0,
1028            NET_REGISTRY_ERR_CODEC,
1029        );
1030    }
1031}