Skip to main content

coralstack_cmd_ipc/
registry.rs

1//! The [`CommandRegistry`] — core routing and execution hub.
2//!
3//! Mirrors the TypeScript `CommandRegistry` in
4//! `packages/cmd-ipc/src/registry/command-registry.ts`: it owns a local
5//! command table, a remote command table (command → owning channel), a
6//! set of connected channels, and a handful of [`TtlMap`]s correlating
7//! in-flight requests, forwarded routes, and recently-seen events.
8//!
9//! # Topology
10//!
11//! * **Root** registries have no `router_channel`. Unknown commands
12//!   produce `NotFound` errors.
13//! * **Child** registries set `router_channel = Some(peer_id)`. Unknown
14//!   commands, and new local registrations, are escalated upstream.
15//! * Events fan out across every connected channel; dedup by message
16//!   id prevents echo loops in meshes.
17//!
18//! Private commands/events (identifiers starting with `_`) stay local
19//! — never escalated, never advertised, never broadcast.
20
21use std::collections::{BTreeMap, HashMap};
22use std::future::Future;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::Arc;
25use std::time::Duration;
26
27use futures::channel::oneshot;
28use futures::future::BoxFuture;
29use futures::FutureExt;
30use parking_lot::Mutex;
31use serde::Serialize;
32use serde_json::Value;
33use uuid::Uuid;
34
35use crate::channel::CommandChannel;
36use crate::command::Command;
37use crate::error::{ChannelError, CommandError, ExecuteErrorCode, RegisterErrorCode};
38use crate::event::Event;
39use crate::message::{
40    CommandDef, ExecuteError, ExecuteResult, False, Message, MessageId, RegisterResult, True,
41};
42use crate::ttl_map::TtlMap;
43
44/// Configuration for a [`CommandRegistry`].
45pub struct Config {
46    /// Registry identifier used in log messages. Defaults to a random
47    /// UUID.
48    pub id: Option<String>,
49    /// Channel id to escalate unknown commands and new registrations
50    /// to. Leave `None` for a root registry.
51    pub router_channel: Option<String>,
52    /// How long a pending `execute` / `register` reply can wait before
53    /// being rejected with [`CommandError::Timeout`]. Zero disables
54    /// the TTL check (request hangs until the channel closes).
55    pub request_ttl: Duration,
56    /// How long a seen event id is remembered for dedup purposes.
57    pub event_ttl: Duration,
58}
59
60impl Default for Config {
61    fn default() -> Self {
62        Self {
63            id: None,
64            router_channel: None,
65            request_ttl: Duration::from_secs(30),
66            event_ttl: Duration::from_secs(5),
67        }
68    }
69}
70
71type HandlerFn = dyn Fn(Value) -> BoxFuture<'static, Result<Value, ExecuteError>> + Send + Sync;
72type EventListener = Arc<dyn Fn(Value) + Send + Sync>;
73
74struct LocalEntry {
75    handler: Arc<HandlerFn>,
76    def: CommandDef,
77    is_private: bool,
78}
79
80struct PendingExecute {
81    tx: oneshot::Sender<ExecuteResult>,
82    target_channel: String,
83}
84
85/// Outcome delivered to the caller awaiting a `register.command.request`
86/// reply. The peer's `RegisterResult` is wrapped so the registry can
87/// synthesize timeout / disconnect errors distinguishable from the
88/// wire-level duplicate-command case.
89enum RegisterOutcome {
90    Wire(RegisterResult),
91    Timeout,
92    Disconnected,
93}
94
95struct PendingRegister {
96    tx: oneshot::Sender<RegisterOutcome>,
97    target_channel: String,
98}
99
100struct RouteEntry {
101    origin_channel: String,
102    target_channel: String,
103}
104
105/// Shared state behind the [`CommandRegistry`] Arc.
106struct Inner {
107    id: String,
108    router_channel: Option<String>,
109    local: Mutex<HashMap<String, LocalEntry>>,
110    /// command id -> owning channel id
111    remote: Mutex<HashMap<String, String>>,
112    /// command id -> advertised definition (description + schema)
113    /// kept parallel to `remote` so `list_commands` can render
114    /// the same richness for remote entries as for local ones.
115    remote_defs: Mutex<HashMap<String, CommandDef>>,
116    channels: Mutex<HashMap<String, Arc<dyn CommandChannel>>>,
117    execute_replies: TtlMap<MessageId, PendingExecute>,
118    register_replies: TtlMap<MessageId, PendingRegister>,
119    routes: TtlMap<MessageId, RouteEntry>,
120    seen_events: TtlMap<MessageId, ()>,
121    /// Event listeners keyed by event id then by a monotonic token, so
122    /// `add_event_listener` can return an unsubscribe closure that
123    /// removes just that one listener. `BTreeMap` preserves insertion
124    /// order (tokens are monotonically increasing) for dispatch.
125    event_listeners: Mutex<HashMap<String, BTreeMap<u64, EventListener>>>,
126    /// Monotonically-increasing token used to key event listeners.
127    next_listener_token: AtomicU64,
128}
129
130/// The main entry point of the crate.
131///
132/// A registry is cheap to clone: internally it's an `Arc<Inner>`.
133#[derive(Clone)]
134pub struct CommandRegistry {
135    inner: Arc<Inner>,
136}
137
138impl CommandRegistry {
139    pub fn new(cfg: Config) -> Self {
140        // On TTL expiry, deliver an explicit Timeout outcome on the
141        // pending oneshot so the caller's `rx.await` resolves to
142        // `CommandError::Timeout` rather than blocking forever (or
143        // collapsing to a generic `ChannelDisconnected` when the tx is
144        // dropped). Eviction is lazy — driven by `sweep_expired()`
145        // calls in the driver loop and registry hot paths.
146        let execute_replies =
147            TtlMap::new(cfg.request_ttl).with_on_expire(|_, pending: PendingExecute| {
148                let _ = pending.tx.send(ExecuteResult::Err {
149                    ok: False,
150                    error: ExecuteError {
151                        code: ExecuteErrorCode::Timeout,
152                        message: "request timed out".into(),
153                    },
154                });
155            });
156        let register_replies =
157            TtlMap::new(cfg.request_ttl).with_on_expire(|_, pending: PendingRegister| {
158                let _ = pending.tx.send(RegisterOutcome::Timeout);
159            });
160
161        let inner = Arc::new(Inner {
162            id: cfg.id.unwrap_or_else(|| Uuid::new_v4().to_string()),
163            router_channel: cfg.router_channel,
164            local: Mutex::new(HashMap::new()),
165            remote: Mutex::new(HashMap::new()),
166            remote_defs: Mutex::new(HashMap::new()),
167            channels: Mutex::new(HashMap::new()),
168            execute_replies,
169            register_replies,
170            routes: TtlMap::new(cfg.request_ttl),
171            seen_events: TtlMap::new(cfg.event_ttl),
172            event_listeners: Mutex::new(HashMap::new()),
173            next_listener_token: AtomicU64::new(0),
174        });
175        Self { inner }
176    }
177
178    /// Returns this registry's identifier.
179    pub fn id(&self) -> &str {
180        &self.inner.id
181    }
182
183    /// Returns the ids of every currently-registered channel, sorted.
184    ///
185    /// Mirrors the TypeScript library's `listChannels()` method.
186    pub fn list_channels(&self) -> Vec<String> {
187        let mut ids: Vec<String> = self.inner.channels.lock().keys().cloned().collect();
188        ids.sort();
189        ids
190    }
191
192    /// Returns the full [`CommandDef`] (id + description + schema) for
193    /// every reachable command — local (non-private) and remote. Remote
194    /// defs are those advertised via `register.command.request` or
195    /// `list.commands.response` on the channel.
196    ///
197    /// Mirrors the TypeScript library's `listCommands()` method.
198    /// Results are sorted by id. A command id is only included once even
199    /// if both a local and remote entry exist (local wins).
200    pub fn list_commands(&self) -> Vec<CommandDef> {
201        let mut out: HashMap<String, CommandDef> = HashMap::new();
202        for (id, entry) in self.inner.local.lock().iter() {
203            if !entry.is_private {
204                out.insert(id.clone(), entry.def.clone());
205            }
206        }
207        for (id, def) in self.inner.remote_defs.lock().iter() {
208            out.entry(id.clone()).or_insert_with(|| def.clone());
209        }
210        let mut v: Vec<CommandDef> = out.into_values().collect();
211        v.sort_by(|a, b| a.id.cmp(&b.id));
212        v
213    }
214
215    /// Register a command on this registry.
216    ///
217    /// The single registration entry point, covering both compile-time
218    /// and runtime commands:
219    ///
220    /// - **Compile-time**: pass an instance of a type that implements
221    ///   [`Command`]. The `#[command]` / `#[command_service]` macros
222    ///   generate such types from a plain `async fn`.
223    /// - **Runtime**: pass a [`DynCommand`] carrying owned id /
224    ///   description / schema and a closure handler.
225    ///
226    /// Mirrors the TypeScript library's `registerCommand`.
227    ///
228    /// - Commands whose id starts with `_` stay local: they are never
229    ///   escalated to a `router_channel` and never advertised to peers
230    ///   via `list.commands.response`.
231    /// - Non-private commands are escalated upstream if this registry
232    ///   has a `router_channel`; the local entry is only committed
233    ///   after the router acks.
234    /// - The advertised schema is normalized via
235    ///   [`crate::schema::normalize_schema`] on the way in, so every
236    ///   schema leaving the registry is language-agnostic JSON Schema
237    ///   regardless of how the caller built it.
238    pub async fn register_command<C: Command>(&self, cmd: C) -> Result<(), CommandError> {
239        let id = cmd.id().to_string();
240        let description = cmd.description().map(str::to_string);
241        let schema = cmd.schema().map(crate::schema::normalize_command_schema);
242        let is_private = id.starts_with('_');
243        let def = CommandDef {
244            id: id.clone(),
245            description,
246            schema,
247        };
248        let handler: Arc<HandlerFn> = Arc::new({
249            let cmd = Arc::new(cmd);
250            move |value: Value| {
251                let cmd = cmd.clone();
252                async move {
253                    let req: C::Request =
254                        serde_json::from_value(value).map_err(|e| ExecuteError {
255                            code: ExecuteErrorCode::InvalidRequest,
256                            message: e.to_string(),
257                        })?;
258                    let res = cmd
259                        .handle(req)
260                        .await
261                        .map_err(|e| command_error_to_execute(&e, cmd.id()))?;
262                    serde_json::to_value(res).map_err(|e| ExecuteError {
263                        code: ExecuteErrorCode::InternalError,
264                        message: e.to_string(),
265                    })
266                }
267                .boxed()
268            }
269        });
270        self.register_inner(id, handler, def, is_private).await
271    }
272
273    async fn register_inner(
274        &self,
275        id: String,
276        handler: Arc<HandlerFn>,
277        def: CommandDef,
278        is_private: bool,
279    ) -> Result<(), CommandError> {
280        self.inner.execute_replies.sweep_expired();
281        self.inner.register_replies.sweep_expired();
282        // Duplicate check against the local table.
283        if self.inner.local.lock().contains_key(&id) {
284            return Err(CommandError::DuplicateCommand(id));
285        }
286
287        // Non-private commands escalate to the router before being added.
288        if !is_private {
289            if let Some(router_id) = self.inner.router_channel.clone() {
290                let router_ch = self.inner.channels.lock().get(&router_id).cloned();
291                if let Some(router_ch) = router_ch {
292                    let req_id = MessageId::new_v4();
293                    let (tx, rx) = oneshot::channel();
294                    self.inner.register_replies.insert(
295                        req_id,
296                        PendingRegister {
297                            tx,
298                            target_channel: router_id.clone(),
299                        },
300                    );
301                    router_ch
302                        .send(Message::RegisterCommandRequest {
303                            id: req_id,
304                            command: def.clone(),
305                        })
306                        .map_err(|_| CommandError::ChannelDisconnected)?;
307                    match rx.await {
308                        Ok(RegisterOutcome::Wire(RegisterResult::Ok { .. })) => {}
309                        Ok(RegisterOutcome::Wire(RegisterResult::Err { error, .. })) => {
310                            return Err(match error {
311                                RegisterErrorCode::DuplicateCommand => {
312                                    CommandError::DuplicateCommand(id)
313                                }
314                            });
315                        }
316                        Ok(RegisterOutcome::Timeout) => return Err(CommandError::Timeout),
317                        Ok(RegisterOutcome::Disconnected) | Err(_) => {
318                            return Err(CommandError::ChannelDisconnected);
319                        }
320                    }
321                }
322            }
323        }
324
325        self.inner.local.lock().insert(
326            id,
327            LocalEntry {
328                handler,
329                def,
330                is_private,
331            },
332        );
333        Ok(())
334    }
335
336    /// Connects a [`CommandChannel`] to this registry.
337    ///
338    /// Returns a driver future which must be polled by the caller's
339    /// executor (via `tokio::spawn`, `smol::spawn`,
340    /// `futures::executor::block_on`, …) for the registry to exchange
341    /// messages with the peer. The future completes when the channel
342    /// closes.
343    pub async fn register_channel(
344        &self,
345        channel: Arc<dyn CommandChannel>,
346    ) -> Result<impl Future<Output = ()> + Send + 'static, ChannelError> {
347        let id = channel.id().to_string();
348        {
349            let mut chans = self.inner.channels.lock();
350            if chans.contains_key(&id) {
351                return Err(ChannelError::Other(format!(
352                    "channel with id `{id}` already registered"
353                )));
354            }
355            chans.insert(id.clone(), channel.clone());
356        }
357
358        channel.start().await?;
359
360        // Ask the peer for its command list. The response is handled
361        // by the driver loop, which will register each entry as remote.
362        if let Err(e) = channel.send(Message::ListCommandsRequest {
363            id: MessageId::new_v4(),
364        }) {
365            self.inner.channels.lock().remove(&id);
366            return Err(e);
367        }
368
369        let inner = self.inner.clone();
370        let ch = channel;
371        Ok(async move {
372            while let Some(msg) = ch.recv().await {
373                // Opportunistic TTL sweep: any registry activity
374                // triggers a pass over the pending-reply / route maps
375                // so timed-out entries fire their on_expire callbacks
376                // (delivering Timeout to waiting callers). Cheap —
377                // O(n) over maps that are normally tiny.
378                inner.execute_replies.sweep_expired();
379                inner.register_replies.sweep_expired();
380                inner.routes.sweep_expired();
381                Inner::handle_message(inner.clone(), ch.clone(), msg).await;
382            }
383            Inner::handle_channel_close(&inner, ch.id());
384        })
385    }
386
387    /// Executes a command identified by a compile-time [`Command`] type
388    /// — the **strict** form, giving the same compile-time type safety
389    /// that TypeScript's strict-mode `executeCommand<K>` gives via the
390    /// `CommandSchemaMap` type parameter.
391    ///
392    /// The command id comes from `C::ID`, the request type is pinned to
393    /// `C::Request`, and the response type is pinned to `C::Response`,
394    /// so the compiler rejects mismatches at the call site:
395    ///
396    /// ```ignore
397    /// let sum: i64 = registry.execute::<MathAdd>(AddReq { a: 2, b: 3 }).await?;
398    /// ```
399    ///
400    /// For commands whose id or payload shape is only known at runtime
401    /// (scripting hosts, FFI, plugins that advertise their own schema),
402    /// use [`execute_dyn`](Self::execute_dyn).
403    pub async fn execute<C: Command>(
404        &self,
405        request: C::Request,
406    ) -> Result<C::Response, CommandError>
407    where
408        C::Request: Serialize,
409        C::Response: serde::de::DeserializeOwned,
410    {
411        let req_value = value_from_request(&request)?;
412        let result = self.execute_raw_impl(C::ID.to_string(), req_value).await?;
413        let deserialized = serde_json::from_value(result.unwrap_or(Value::Null))?;
414        Ok(deserialized)
415    }
416
417    /// Executes a command whose id is only known at runtime — the
418    /// **loose** form, mirroring the TypeScript library's
419    /// `executeCommand(id, args)` in loose mode.
420    ///
421    /// Request and response are raw [`serde_json::Value`]s, so this is
422    /// the canonical entry point for plugin hosts, scripting runtimes,
423    /// FFI bridges, and any code where the schema is discovered via
424    /// [`list_commands`](Self::list_commands) rather than declared at
425    /// compile time.
426    ///
427    /// For statically-known commands, prefer [`execute`](Self::execute)
428    /// — it pins both types via the [`Command`] trait.
429    pub async fn execute_dyn(
430        &self,
431        command_id: &str,
432        request: Value,
433    ) -> Result<Value, CommandError> {
434        let result = self
435            .execute_raw_impl(command_id.to_string(), request)
436            .await?;
437        Ok(result.unwrap_or(Value::Null))
438    }
439
440    async fn execute_raw_impl(
441        &self,
442        command_id: String,
443        request: Value,
444    ) -> Result<Option<Value>, CommandError> {
445        // Flush any expired pending replies so stale entries fire their
446        // Timeout on_expire before we enqueue a new one.
447        self.inner.execute_replies.sweep_expired();
448        self.inner.register_replies.sweep_expired();
449        // 1) Local handler wins.
450        let local_handler = self
451            .inner
452            .local
453            .lock()
454            .get(&command_id)
455            .map(|entry| entry.handler.clone());
456        if let Some(handler) = local_handler {
457            return handler(request)
458                .await
459                .map(Some)
460                .map_err(|e| e.into_command_error(&command_id));
461        }
462
463        // 2) Known remote command.
464        let remote_target = self.inner.remote.lock().get(&command_id).cloned();
465        let target = match remote_target {
466            Some(t) => Some(t),
467            None => self.inner.router_channel.clone(),
468        };
469
470        let Some(target_id) = target else {
471            return Err(CommandError::NotFound(command_id));
472        };
473
474        let channel = self.inner.channels.lock().get(&target_id).cloned();
475        let Some(channel) = channel else {
476            return Err(CommandError::ChannelDisconnected);
477        };
478
479        self.forward_execute(command_id, request, &channel, target_id)
480            .await
481    }
482
483    async fn forward_execute(
484        &self,
485        command_id: String,
486        request: Value,
487        channel: &Arc<dyn CommandChannel>,
488        target_id: String,
489    ) -> Result<Option<Value>, CommandError> {
490        let req_id = MessageId::new_v4();
491        let (tx, rx) = oneshot::channel();
492        self.inner.execute_replies.insert(
493            req_id,
494            PendingExecute {
495                tx,
496                target_channel: target_id,
497            },
498        );
499        channel
500            .send(Message::ExecuteCommandRequest {
501                id: req_id,
502                command_id: command_id.clone(),
503                // Void requests are elided from the wire (Null → None)
504                // so peers expecting an absent `request` field (per the
505                // JSON Schema spec) don't see `"request": null`.
506                request: value_to_wire(request),
507            })
508            .map_err(|_| CommandError::ChannelDisconnected)?;
509
510        match rx.await {
511            Ok(ExecuteResult::Ok { result, .. }) => Ok(result),
512            Ok(ExecuteResult::Err { error, .. }) => Err(error_to_command_error(error, &command_id)),
513            Err(_) => {
514                self.inner.execute_replies.remove(&req_id);
515                Err(CommandError::ChannelDisconnected)
516            }
517        }
518    }
519
520    /// Emit an event. Dispatches to local listeners and — unless the
521    /// event id is private (starts with `_`) — broadcasts to every
522    /// connected channel.
523    ///
524    /// Works for both compile-time events (`#[event]`-annotated
525    /// structs) and runtime events ([`DynEvent`](crate::command::Command)
526    /// — actually [`DynEvent`](crate::event::DynEvent)). Id, description,
527    /// and schema are all read off the event instance.
528    pub fn emit<E: Event>(&self, event: E) -> Result<(), CommandError> {
529        let event_id = event.id().to_string();
530        let payload_value = serde_json::to_value(&event)?;
531        let msg_id = MessageId::new_v4();
532        self.inner.seen_events.insert(msg_id, ());
533
534        self.dispatch_event_locally(&event_id, &payload_value);
535
536        if !event_id.starts_with('_') {
537            let channels: Vec<Arc<dyn CommandChannel>> =
538                self.inner.channels.lock().values().cloned().collect();
539            // Void payloads (serde `()` → `Value::Null`) are elided
540            // from the wire per the event schema (`payload` is optional).
541            let wire_payload = value_to_wire(payload_value);
542            for ch in channels {
543                let _ = ch.send(Message::Event {
544                    id: msg_id,
545                    event_id: event_id.clone(),
546                    payload: wire_payload.clone(),
547                });
548            }
549        }
550        Ok(())
551    }
552
553    /// Subscribe a typed listener. The callback receives a
554    /// deserialized `E` every time an event with id `E::ID` fires,
555    /// whether emitted locally or received from a connected channel.
556    ///
557    /// Returns an unsubscribe closure — call it (and drop it) to
558    /// remove just this listener. Ignoring the return value is fine;
559    /// the listener then lives for the life of the registry.
560    ///
561    /// Listeners for the same event fire in insertion order. Payloads
562    /// that fail to deserialize into `E` are silently dropped for
563    /// this listener — they still flow to any typed-for-Value
564    /// listeners registered via [`on_dyn`](Self::on_dyn).
565    pub fn on<E: Event + serde::de::DeserializeOwned>(
566        &self,
567        listener: impl Fn(E) + Send + Sync + 'static,
568    ) -> impl FnOnce() + Send + Sync + 'static {
569        self.install_listener(E::ID, move |value| {
570            if let Ok(typed) = serde_json::from_value::<E>(value) {
571                listener(typed);
572            }
573        })
574    }
575
576    /// Subscribe a dynamic listener by runtime id. The callback
577    /// receives the raw JSON payload. Use this when the event id is
578    /// only known at runtime (plugin runtimes, FFI, scripting hosts);
579    /// prefer [`on`](Self::on) whenever you have a compile-time
580    /// [`Event`] type.
581    ///
582    /// Same unsubscribe semantics as [`on`](Self::on).
583    pub fn on_dyn<F>(
584        &self,
585        event_id: impl Into<String>,
586        listener: F,
587    ) -> impl FnOnce() + Send + Sync + 'static
588    where
589        F: Fn(Value) + Send + Sync + 'static,
590    {
591        self.install_listener(&event_id.into(), listener)
592    }
593
594    fn install_listener<F>(
595        &self,
596        event_id: &str,
597        listener: F,
598    ) -> impl FnOnce() + Send + Sync + 'static
599    where
600        F: Fn(Value) + Send + Sync + 'static,
601    {
602        let token = self
603            .inner
604            .next_listener_token
605            .fetch_add(1, Ordering::Relaxed);
606        self.inner
607            .event_listeners
608            .lock()
609            .entry(event_id.to_string())
610            .or_default()
611            .insert(token, Arc::new(listener));
612
613        let inner = Arc::clone(&self.inner);
614        let event_id = event_id.to_string();
615        move || {
616            let mut map = inner.event_listeners.lock();
617            if let Some(slot) = map.get_mut(&event_id) {
618                slot.remove(&token);
619                if slot.is_empty() {
620                    map.remove(&event_id);
621                }
622            }
623        }
624    }
625
626    fn dispatch_event_locally(&self, event_id: &str, payload: &Value) {
627        let listeners: Vec<EventListener> = self
628            .inner
629            .event_listeners
630            .lock()
631            .get(event_id)
632            .map(|m| m.values().cloned().collect())
633            .unwrap_or_default();
634        for l in listeners {
635            l(payload.clone());
636        }
637    }
638
639    /// Tears down the registry: awaits `close()` on every connected
640    /// channel, drops every local and remote command, and clears all
641    /// event listeners. In-flight executes and register requests fail
642    /// with [`CommandError::ChannelDisconnected`] via the existing
643    /// channel-close path.
644    ///
645    /// Mirrors the TypeScript library's `dispose()`, but async so that
646    /// transports doing real teardown work (HTTP flush, MCP goodbye,
647    /// plugin sandbox shutdown) complete before this returns.
648    ///
649    /// Callers normally don't need this — dropping the last
650    /// `CommandRegistry` clone releases the inner state automatically
651    /// via `Drop`. Use `dispose` when a *shared* registry (held through
652    /// multiple clones) needs to be forcibly torn down, or in tests.
653    pub async fn dispose(&self) {
654        // Snapshot channel arcs so we can call `close` without holding
655        // the channels lock for the duration.
656        let channels: Vec<Arc<dyn CommandChannel>> = {
657            let mut locked = self.inner.channels.lock();
658            let out: Vec<_> = locked.values().cloned().collect();
659            locked.clear();
660            out
661        };
662
663        // Await each channel's close sequentially. Channels define
664        // their own close semantics (InMemoryChannel is effectively
665        // synchronous; MCPServerChannel flushes the transport; Flow's
666        // SourceChannel tears down its QuickJS VM), so we let every
667        // implementation finish its teardown before returning.
668        for ch in channels {
669            ch.close().await;
670        }
671
672        self.inner.local.lock().clear();
673        self.inner.remote.lock().clear();
674        self.inner.remote_defs.lock().clear();
675        self.inner.event_listeners.lock().clear();
676    }
677}
678
679impl Inner {
680    fn local_command_defs(&self) -> Vec<CommandDef> {
681        self.local
682            .lock()
683            .values()
684            .filter(|e| !e.is_private)
685            .map(|e| e.def.clone())
686            .collect()
687    }
688
689    /// Central dispatcher invoked by each channel's driver loop.
690    async fn handle_message(inner: Arc<Self>, channel: Arc<dyn CommandChannel>, msg: Message) {
691        match msg {
692            Message::RegisterCommandRequest { id, command } => {
693                Self::handle_register_request(inner, channel, id, command).await;
694            }
695            Message::RegisterCommandResponse { thid, response, .. } => {
696                if let Some(pending) = inner.register_replies.remove(&thid) {
697                    let _ = pending.tx.send(RegisterOutcome::Wire(response));
698                }
699            }
700            Message::ListCommandsRequest { id } => {
701                let commands = inner.local_command_defs();
702                let _ = channel.send(Message::ListCommandsResponse {
703                    id: MessageId::new_v4(),
704                    thid: id,
705                    commands,
706                });
707            }
708            Message::ListCommandsResponse { commands, .. } => {
709                let channel_id = channel.id().to_string();
710                let mut remote = inner.remote.lock();
711                let mut remote_defs = inner.remote_defs.lock();
712                for cmd in commands {
713                    // Normalize ingested schemas so the local cache
714                    // matches what register() produces.
715                    let cmd = CommandDef {
716                        id: cmd.id,
717                        description: cmd.description,
718                        schema: cmd.schema.map(crate::schema::normalize_command_schema),
719                    };
720                    let entry_is_new = !remote.contains_key(&cmd.id);
721                    if entry_is_new {
722                        remote.insert(cmd.id.clone(), channel_id.clone());
723                    }
724                    // Always refresh the def (the latest advertisement wins).
725                    remote_defs.insert(cmd.id.clone(), cmd);
726                }
727            }
728            Message::ExecuteCommandRequest {
729                id,
730                command_id,
731                request,
732            } => {
733                Self::handle_execute_request(
734                    inner,
735                    channel,
736                    id,
737                    command_id,
738                    request.unwrap_or(Value::Null),
739                )
740                .await;
741            }
742            Message::ExecuteCommandResponse { thid, response, .. } => {
743                Self::handle_execute_response(&inner, thid, response);
744            }
745            Message::Event {
746                id,
747                event_id,
748                payload,
749            } => {
750                Self::handle_event(&inner, channel, id, event_id, payload);
751            }
752        }
753    }
754
755    async fn handle_register_request(
756        inner: Arc<Self>,
757        channel: Arc<dyn CommandChannel>,
758        req_id: MessageId,
759        command: CommandDef,
760    ) {
761        // Normalize ingested schemas so our cached copy is guaranteed
762        // to be language-agnostic JSON Schema, even if the peer didn't
763        // normalize on its side.
764        let command = CommandDef {
765            id: command.id,
766            description: command.description,
767            schema: command.schema.map(crate::schema::normalize_command_schema),
768        };
769        let channel_id = channel.id().to_string();
770        let command_id = command.id.clone();
771
772        // Duplicate against local?
773        let dup = inner.local.lock().contains_key(&command_id);
774        if dup {
775            let _ = channel.send(Message::RegisterCommandResponse {
776                id: MessageId::new_v4(),
777                thid: req_id,
778                response: RegisterResult::Err {
779                    ok: False,
780                    error: RegisterErrorCode::DuplicateCommand,
781                },
782            });
783            return;
784        }
785
786        // Already known in the remote table?
787        // - Same channel re-advertising: short-circuit with a success
788        //   ack, do NOT re-escalate upstream. A re-escalation would
789        //   bubble up as a duplicate rejection from the router and
790        //   spuriously fail a legitimate re-registration (common when
791        //   a plugin host re-advertises its command list).
792        // - Different channel claiming the same id: reject as duplicate.
793        let existing_owner = inner.remote.lock().get(&command_id).cloned();
794        match existing_owner {
795            Some(owner) if owner == channel_id => {
796                // Refresh the cached def and re-ack. No upstream traffic.
797                inner.remote_defs.lock().insert(command_id, command);
798                let _ = channel.send(Message::RegisterCommandResponse {
799                    id: MessageId::new_v4(),
800                    thid: req_id,
801                    response: RegisterResult::Ok { ok: True },
802                });
803                return;
804            }
805            Some(_) => {
806                let _ = channel.send(Message::RegisterCommandResponse {
807                    id: MessageId::new_v4(),
808                    thid: req_id,
809                    response: RegisterResult::Err {
810                        ok: False,
811                        error: RegisterErrorCode::DuplicateCommand,
812                    },
813                });
814                return;
815            }
816            None => {}
817        }
818
819        // Escalate upstream if we have a router.
820        if let Some(router_id) = inner.router_channel.clone() {
821            if router_id != channel_id {
822                let router_ch = inner.channels.lock().get(&router_id).cloned();
823                if let Some(router_ch) = router_ch {
824                    let up_id = MessageId::new_v4();
825                    let (tx, rx) = oneshot::channel();
826                    inner.register_replies.insert(
827                        up_id,
828                        PendingRegister {
829                            tx,
830                            target_channel: router_id,
831                        },
832                    );
833                    if router_ch
834                        .send(Message::RegisterCommandRequest {
835                            id: up_id,
836                            command: command.clone(),
837                        })
838                        .is_ok()
839                    {
840                        let up = rx.await;
841                        match up {
842                            Ok(RegisterOutcome::Wire(RegisterResult::Ok { .. })) => {}
843                            Ok(RegisterOutcome::Wire(RegisterResult::Err { error, .. })) => {
844                                let _ = channel.send(Message::RegisterCommandResponse {
845                                    id: MessageId::new_v4(),
846                                    thid: req_id,
847                                    response: RegisterResult::Err { ok: False, error },
848                                });
849                                return;
850                            }
851                            // Timeout / disconnected upstream: we have no
852                            // wire-level error code for these on the
853                            // register response, so surface them as
854                            // duplicate_command to match the prior
855                            // behaviour. The escalating caller's own
856                            // register_command call will see the correct
857                            // Timeout / ChannelDisconnected via its own
858                            // pending entry.
859                            Ok(RegisterOutcome::Timeout)
860                            | Ok(RegisterOutcome::Disconnected)
861                            | Err(_) => {
862                                let _ = channel.send(Message::RegisterCommandResponse {
863                                    id: MessageId::new_v4(),
864                                    thid: req_id,
865                                    response: RegisterResult::Err {
866                                        ok: False,
867                                        error: RegisterErrorCode::DuplicateCommand,
868                                    },
869                                });
870                                return;
871                            }
872                        }
873                    }
874                }
875            }
876        }
877
878        inner.remote.lock().insert(command_id.clone(), channel_id);
879        inner.remote_defs.lock().insert(command_id, command);
880        let _ = channel.send(Message::RegisterCommandResponse {
881            id: MessageId::new_v4(),
882            thid: req_id,
883            response: RegisterResult::Ok { ok: True },
884        });
885    }
886
887    async fn handle_execute_request(
888        inner: Arc<Self>,
889        origin: Arc<dyn CommandChannel>,
890        req_id: MessageId,
891        command_id: String,
892        request: Value,
893    ) {
894        // Local handler?
895        let handler = inner
896            .local
897            .lock()
898            .get(&command_id)
899            .map(|e| e.handler.clone());
900        if let Some(handler) = handler {
901            let result = handler(request).await;
902            let response = match result {
903                Ok(v) => ExecuteResult::Ok {
904                    ok: True,
905                    // Void responses (`() → Value::Null`) are elided from
906                    // the wire per the response schema (`result` optional).
907                    result: value_to_wire(v),
908                },
909                Err(error) => ExecuteResult::Err { ok: False, error },
910            };
911            let _ = origin.send(Message::ExecuteCommandResponse {
912                id: MessageId::new_v4(),
913                thid: req_id,
914                response,
915            });
916            return;
917        }
918
919        // Forward?
920        let target_id = inner
921            .remote
922            .lock()
923            .get(&command_id)
924            .cloned()
925            .or_else(|| inner.router_channel.clone());
926
927        let origin_id = origin.id().to_string();
928        let Some(target_id) = target_id else {
929            let _ = origin.send(Message::ExecuteCommandResponse {
930                id: MessageId::new_v4(),
931                thid: req_id,
932                response: ExecuteResult::Err {
933                    ok: False,
934                    error: ExecuteError {
935                        code: ExecuteErrorCode::NotFound,
936                        message: format!("command not found: {command_id}"),
937                    },
938                },
939            });
940            return;
941        };
942
943        if target_id == origin_id {
944            // Would loop; treat as not found.
945            let _ = origin.send(Message::ExecuteCommandResponse {
946                id: MessageId::new_v4(),
947                thid: req_id,
948                response: ExecuteResult::Err {
949                    ok: False,
950                    error: ExecuteError {
951                        code: ExecuteErrorCode::NotFound,
952                        message: format!("command not found: {command_id}"),
953                    },
954                },
955            });
956            return;
957        }
958
959        let target = inner.channels.lock().get(&target_id).cloned();
960        let Some(target) = target else {
961            let _ = origin.send(Message::ExecuteCommandResponse {
962                id: MessageId::new_v4(),
963                thid: req_id,
964                response: ExecuteResult::Err {
965                    ok: False,
966                    error: ExecuteError {
967                        code: ExecuteErrorCode::ChannelDisconnected,
968                        message: "target channel disconnected".into(),
969                    },
970                },
971            });
972            return;
973        };
974
975        inner.routes.insert(
976            req_id,
977            RouteEntry {
978                origin_channel: origin_id,
979                target_channel: target_id,
980            },
981        );
982        let _ = target.send(Message::ExecuteCommandRequest {
983            id: req_id,
984            command_id,
985            request: value_to_wire(request),
986        });
987    }
988
989    fn handle_execute_response(inner: &Arc<Self>, thid: MessageId, response: ExecuteResult) {
990        // Either this is a reply to a local call…
991        if let Some(pending) = inner.execute_replies.remove(&thid) {
992            let _ = pending.tx.send(response);
993            return;
994        }
995
996        // …or we forwarded this request and need to route the reply.
997        if let Some(route) = inner.routes.remove(&thid) {
998            let origin = inner.channels.lock().get(&route.origin_channel).cloned();
999            if let Some(origin) = origin {
1000                let _ = origin.send(Message::ExecuteCommandResponse {
1001                    id: MessageId::new_v4(),
1002                    thid,
1003                    response,
1004                });
1005            }
1006        }
1007    }
1008
1009    fn handle_event(
1010        inner: &Arc<Self>,
1011        origin: Arc<dyn CommandChannel>,
1012        msg_id: MessageId,
1013        event_id: String,
1014        payload: Option<Value>,
1015    ) {
1016        if inner.seen_events.contains_key(&msg_id) {
1017            return;
1018        }
1019        inner.seen_events.insert(msg_id, ());
1020
1021        let payload_value = payload.clone().unwrap_or(Value::Null);
1022        let listeners: Vec<EventListener> = inner
1023            .event_listeners
1024            .lock()
1025            .get(&event_id)
1026            .map(|m| m.values().cloned().collect())
1027            .unwrap_or_default();
1028        for l in listeners {
1029            l(payload_value.clone());
1030        }
1031
1032        if event_id.starts_with('_') {
1033            return;
1034        }
1035
1036        let channels: Vec<Arc<dyn CommandChannel>> = inner
1037            .channels
1038            .lock()
1039            .iter()
1040            .filter(|(k, _)| k.as_str() != origin.id())
1041            .map(|(_, v)| v.clone())
1042            .collect();
1043        for ch in channels {
1044            let _ = ch.send(Message::Event {
1045                id: msg_id,
1046                event_id: event_id.clone(),
1047                payload: payload.clone(),
1048            });
1049        }
1050    }
1051
1052    /// Invoked by the driver once the channel returns `None` from recv.
1053    fn handle_channel_close(inner: &Arc<Self>, channel_id: &str) {
1054        // Drop the channel from the lookup table.
1055        inner.channels.lock().remove(channel_id);
1056
1057        // Drop every remote command owned by this channel, along with
1058        // its cached definition.
1059        let dropped_ids: Vec<String> = {
1060            let mut remote = inner.remote.lock();
1061            let to_drop: Vec<String> = remote
1062                .iter()
1063                .filter(|(_, owner)| *owner == channel_id)
1064                .map(|(id, _)| id.clone())
1065                .collect();
1066            for id in &to_drop {
1067                remote.remove(id);
1068            }
1069            to_drop
1070        };
1071        let mut remote_defs = inner.remote_defs.lock();
1072        for id in dropped_ids {
1073            remote_defs.remove(&id);
1074        }
1075        drop(remote_defs);
1076
1077        // Reject any pending executes whose response was expected from
1078        // this channel.
1079        let exec_ids: Vec<MessageId> = inner
1080            .execute_replies
1081            .snapshot_keys_where(|v| v.target_channel == channel_id);
1082        for id in exec_ids {
1083            if let Some(pending) = inner.execute_replies.remove(&id) {
1084                let _ = pending.tx.send(ExecuteResult::Err {
1085                    ok: False,
1086                    error: ExecuteError {
1087                        code: ExecuteErrorCode::ChannelDisconnected,
1088                        message: "channel disconnected".into(),
1089                    },
1090                });
1091            }
1092        }
1093
1094        let reg_ids: Vec<MessageId> = inner
1095            .register_replies
1096            .snapshot_keys_where(|v| v.target_channel == channel_id);
1097        for id in reg_ids {
1098            if let Some(pending) = inner.register_replies.remove(&id) {
1099                let _ = pending.tx.send(RegisterOutcome::Disconnected);
1100            }
1101        }
1102
1103        // For every route where either endpoint is the dead channel,
1104        // notify the origin (if it is still alive).
1105        let route_ids: Vec<MessageId> = inner.routes.snapshot_keys_where(|r| {
1106            r.origin_channel == channel_id || r.target_channel == channel_id
1107        });
1108        for id in route_ids {
1109            if let Some(route) = inner.routes.remove(&id) {
1110                if route.origin_channel == channel_id {
1111                    continue;
1112                }
1113                let origin = inner.channels.lock().get(&route.origin_channel).cloned();
1114                if let Some(origin) = origin {
1115                    let _ = origin.send(Message::ExecuteCommandResponse {
1116                        id: MessageId::new_v4(),
1117                        thid: id,
1118                        response: ExecuteResult::Err {
1119                            ok: False,
1120                            error: ExecuteError {
1121                                code: ExecuteErrorCode::ChannelDisconnected,
1122                                message: "target channel disconnected".into(),
1123                            },
1124                        },
1125                    });
1126                }
1127            }
1128        }
1129    }
1130}
1131
1132// ------- helpers -----------------------------------------------------
1133
1134fn command_error_to_execute(e: &CommandError, command_id: &str) -> ExecuteError {
1135    match e {
1136        CommandError::InvalidRequest { message, .. } => ExecuteError {
1137            code: ExecuteErrorCode::InvalidRequest,
1138            message: message.clone(),
1139        },
1140        CommandError::Internal { message, .. } => ExecuteError {
1141            code: ExecuteErrorCode::InternalError,
1142            message: message.clone(),
1143        },
1144        CommandError::Timeout => ExecuteError {
1145            code: ExecuteErrorCode::Timeout,
1146            message: "request timed out".into(),
1147        },
1148        CommandError::ChannelDisconnected => ExecuteError {
1149            code: ExecuteErrorCode::ChannelDisconnected,
1150            message: "channel disconnected".into(),
1151        },
1152        CommandError::NotFound(id) => ExecuteError {
1153            code: ExecuteErrorCode::NotFound,
1154            message: format!("command not found: {id}"),
1155        },
1156        _ => ExecuteError {
1157            code: ExecuteErrorCode::InternalError,
1158            message: format!("{e} [command {command_id}]"),
1159        },
1160    }
1161}
1162
1163fn error_to_command_error(err: ExecuteError, command_id: &str) -> CommandError {
1164    match err.code {
1165        ExecuteErrorCode::NotFound => CommandError::NotFound(command_id.into()),
1166        ExecuteErrorCode::InvalidRequest => CommandError::InvalidRequest {
1167            command_id: command_id.into(),
1168            message: err.message,
1169        },
1170        ExecuteErrorCode::InternalError => CommandError::Internal {
1171            command_id: command_id.into(),
1172            message: err.message,
1173        },
1174        ExecuteErrorCode::Timeout => CommandError::Timeout,
1175        ExecuteErrorCode::ChannelDisconnected => CommandError::ChannelDisconnected,
1176    }
1177}
1178
1179// Small convenience on ExecuteError.
1180impl ExecuteError {
1181    fn into_command_error(self, command_id: &str) -> CommandError {
1182        error_to_command_error(self, command_id)
1183    }
1184}
1185
1186/// Collapse a serialized request/result/payload value to `None` when it
1187/// is JSON `null`. This is what makes void commands and events
1188/// spec-compliant on the wire: `request` / `result` / `payload` are all
1189/// optional fields in the JSON schemas, so an absent value must be
1190/// encoded by omitting the key, not by emitting `null`.
1191///
1192/// Used on every outgoing `execute.command.request`,
1193/// `execute.command.response` success, and `event` message.
1194fn value_to_wire(v: Value) -> Option<Value> {
1195    if v.is_null() {
1196        None
1197    } else {
1198        Some(v)
1199    }
1200}
1201
1202/// Serialize a strict-mode request value to JSON. Wraps `serde_json`
1203/// with the right error type for the strict `execute::<C>` path.
1204fn value_from_request<T: Serialize>(v: &T) -> Result<Value, CommandError> {
1205    serde_json::to_value(v).map_err(CommandError::Serde)
1206}