1use std::collections::{BTreeMap, HashMap};
22use std::future::Future;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::Arc;
25use std::time::Duration;
26
27use futures::channel::oneshot;
28use futures::future::BoxFuture;
29use futures::FutureExt;
30use parking_lot::Mutex;
31use serde::Serialize;
32use serde_json::Value;
33use uuid::Uuid;
34
35use crate::channel::CommandChannel;
36use crate::command::Command;
37use crate::error::{ChannelError, CommandError, ExecuteErrorCode, RegisterErrorCode};
38use crate::event::Event;
39use crate::message::{
40 CommandDef, ExecuteError, ExecuteResult, False, Message, MessageId, RegisterResult, True,
41};
42use crate::ttl_map::TtlMap;
43
44pub struct Config {
46 pub id: Option<String>,
49 pub router_channel: Option<String>,
52 pub request_ttl: Duration,
56 pub event_ttl: Duration,
58}
59
60impl Default for Config {
61 fn default() -> Self {
62 Self {
63 id: None,
64 router_channel: None,
65 request_ttl: Duration::from_secs(30),
66 event_ttl: Duration::from_secs(5),
67 }
68 }
69}
70
71type HandlerFn = dyn Fn(Value) -> BoxFuture<'static, Result<Value, ExecuteError>> + Send + Sync;
72type EventListener = Arc<dyn Fn(Value) + Send + Sync>;
73
74struct LocalEntry {
75 handler: Arc<HandlerFn>,
76 def: CommandDef,
77 is_private: bool,
78}
79
80struct PendingExecute {
81 tx: oneshot::Sender<ExecuteResult>,
82 target_channel: String,
83}
84
85enum RegisterOutcome {
90 Wire(RegisterResult),
91 Timeout,
92 Disconnected,
93}
94
95struct PendingRegister {
96 tx: oneshot::Sender<RegisterOutcome>,
97 target_channel: String,
98}
99
100struct RouteEntry {
101 origin_channel: String,
102 target_channel: String,
103}
104
105struct Inner {
107 id: String,
108 router_channel: Option<String>,
109 local: Mutex<HashMap<String, LocalEntry>>,
110 remote: Mutex<HashMap<String, String>>,
112 remote_defs: Mutex<HashMap<String, CommandDef>>,
116 channels: Mutex<HashMap<String, Arc<dyn CommandChannel>>>,
117 execute_replies: TtlMap<MessageId, PendingExecute>,
118 register_replies: TtlMap<MessageId, PendingRegister>,
119 routes: TtlMap<MessageId, RouteEntry>,
120 seen_events: TtlMap<MessageId, ()>,
121 event_listeners: Mutex<HashMap<String, BTreeMap<u64, EventListener>>>,
126 next_listener_token: AtomicU64,
128}
129
130#[derive(Clone)]
134pub struct CommandRegistry {
135 inner: Arc<Inner>,
136}
137
138impl CommandRegistry {
139 pub fn new(cfg: Config) -> Self {
140 let execute_replies =
147 TtlMap::new(cfg.request_ttl).with_on_expire(|_, pending: PendingExecute| {
148 let _ = pending.tx.send(ExecuteResult::Err {
149 ok: False,
150 error: ExecuteError {
151 code: ExecuteErrorCode::Timeout,
152 message: "request timed out".into(),
153 },
154 });
155 });
156 let register_replies =
157 TtlMap::new(cfg.request_ttl).with_on_expire(|_, pending: PendingRegister| {
158 let _ = pending.tx.send(RegisterOutcome::Timeout);
159 });
160
161 let inner = Arc::new(Inner {
162 id: cfg.id.unwrap_or_else(|| Uuid::new_v4().to_string()),
163 router_channel: cfg.router_channel,
164 local: Mutex::new(HashMap::new()),
165 remote: Mutex::new(HashMap::new()),
166 remote_defs: Mutex::new(HashMap::new()),
167 channels: Mutex::new(HashMap::new()),
168 execute_replies,
169 register_replies,
170 routes: TtlMap::new(cfg.request_ttl),
171 seen_events: TtlMap::new(cfg.event_ttl),
172 event_listeners: Mutex::new(HashMap::new()),
173 next_listener_token: AtomicU64::new(0),
174 });
175 Self { inner }
176 }
177
178 pub fn id(&self) -> &str {
180 &self.inner.id
181 }
182
183 pub fn list_channels(&self) -> Vec<String> {
187 let mut ids: Vec<String> = self.inner.channels.lock().keys().cloned().collect();
188 ids.sort();
189 ids
190 }
191
192 pub fn list_commands(&self) -> Vec<CommandDef> {
201 let mut out: HashMap<String, CommandDef> = HashMap::new();
202 for (id, entry) in self.inner.local.lock().iter() {
203 if !entry.is_private {
204 out.insert(id.clone(), entry.def.clone());
205 }
206 }
207 for (id, def) in self.inner.remote_defs.lock().iter() {
208 out.entry(id.clone()).or_insert_with(|| def.clone());
209 }
210 let mut v: Vec<CommandDef> = out.into_values().collect();
211 v.sort_by(|a, b| a.id.cmp(&b.id));
212 v
213 }
214
215 pub async fn register_command<C: Command>(&self, cmd: C) -> Result<(), CommandError> {
239 let id = cmd.id().to_string();
240 let description = cmd.description().map(str::to_string);
241 let schema = cmd.schema().map(crate::schema::normalize_command_schema);
242 let is_private = id.starts_with('_');
243 let def = CommandDef {
244 id: id.clone(),
245 description,
246 schema,
247 };
248 let handler: Arc<HandlerFn> = Arc::new({
249 let cmd = Arc::new(cmd);
250 move |value: Value| {
251 let cmd = cmd.clone();
252 async move {
253 let req: C::Request =
254 serde_json::from_value(value).map_err(|e| ExecuteError {
255 code: ExecuteErrorCode::InvalidRequest,
256 message: e.to_string(),
257 })?;
258 let res = cmd
259 .handle(req)
260 .await
261 .map_err(|e| command_error_to_execute(&e, cmd.id()))?;
262 serde_json::to_value(res).map_err(|e| ExecuteError {
263 code: ExecuteErrorCode::InternalError,
264 message: e.to_string(),
265 })
266 }
267 .boxed()
268 }
269 });
270 self.register_inner(id, handler, def, is_private).await
271 }
272
273 async fn register_inner(
274 &self,
275 id: String,
276 handler: Arc<HandlerFn>,
277 def: CommandDef,
278 is_private: bool,
279 ) -> Result<(), CommandError> {
280 self.inner.execute_replies.sweep_expired();
281 self.inner.register_replies.sweep_expired();
282 if self.inner.local.lock().contains_key(&id) {
284 return Err(CommandError::DuplicateCommand(id));
285 }
286
287 if !is_private {
289 if let Some(router_id) = self.inner.router_channel.clone() {
290 let router_ch = self.inner.channels.lock().get(&router_id).cloned();
291 if let Some(router_ch) = router_ch {
292 let req_id = MessageId::new_v4();
293 let (tx, rx) = oneshot::channel();
294 self.inner.register_replies.insert(
295 req_id,
296 PendingRegister {
297 tx,
298 target_channel: router_id.clone(),
299 },
300 );
301 router_ch
302 .send(Message::RegisterCommandRequest {
303 id: req_id,
304 command: def.clone(),
305 })
306 .map_err(|_| CommandError::ChannelDisconnected)?;
307 match rx.await {
308 Ok(RegisterOutcome::Wire(RegisterResult::Ok { .. })) => {}
309 Ok(RegisterOutcome::Wire(RegisterResult::Err { error, .. })) => {
310 return Err(match error {
311 RegisterErrorCode::DuplicateCommand => {
312 CommandError::DuplicateCommand(id)
313 }
314 });
315 }
316 Ok(RegisterOutcome::Timeout) => return Err(CommandError::Timeout),
317 Ok(RegisterOutcome::Disconnected) | Err(_) => {
318 return Err(CommandError::ChannelDisconnected);
319 }
320 }
321 }
322 }
323 }
324
325 self.inner.local.lock().insert(
326 id,
327 LocalEntry {
328 handler,
329 def,
330 is_private,
331 },
332 );
333 Ok(())
334 }
335
336 pub async fn register_channel(
344 &self,
345 channel: Arc<dyn CommandChannel>,
346 ) -> Result<impl Future<Output = ()> + Send + 'static, ChannelError> {
347 let id = channel.id().to_string();
348 {
349 let mut chans = self.inner.channels.lock();
350 if chans.contains_key(&id) {
351 return Err(ChannelError::Other(format!(
352 "channel with id `{id}` already registered"
353 )));
354 }
355 chans.insert(id.clone(), channel.clone());
356 }
357
358 channel.start().await?;
359
360 if let Err(e) = channel.send(Message::ListCommandsRequest {
363 id: MessageId::new_v4(),
364 }) {
365 self.inner.channels.lock().remove(&id);
366 return Err(e);
367 }
368
369 let inner = self.inner.clone();
370 let ch = channel;
371 Ok(async move {
372 while let Some(msg) = ch.recv().await {
373 inner.execute_replies.sweep_expired();
379 inner.register_replies.sweep_expired();
380 inner.routes.sweep_expired();
381 Inner::handle_message(inner.clone(), ch.clone(), msg).await;
382 }
383 Inner::handle_channel_close(&inner, ch.id());
384 })
385 }
386
387 pub async fn execute<C: Command>(
404 &self,
405 request: C::Request,
406 ) -> Result<C::Response, CommandError>
407 where
408 C::Request: Serialize,
409 C::Response: serde::de::DeserializeOwned,
410 {
411 let req_value = value_from_request(&request)?;
412 let result = self.execute_raw_impl(C::ID.to_string(), req_value).await?;
413 let deserialized = serde_json::from_value(result.unwrap_or(Value::Null))?;
414 Ok(deserialized)
415 }
416
417 pub async fn execute_dyn(
430 &self,
431 command_id: &str,
432 request: Value,
433 ) -> Result<Value, CommandError> {
434 let result = self
435 .execute_raw_impl(command_id.to_string(), request)
436 .await?;
437 Ok(result.unwrap_or(Value::Null))
438 }
439
440 async fn execute_raw_impl(
441 &self,
442 command_id: String,
443 request: Value,
444 ) -> Result<Option<Value>, CommandError> {
445 self.inner.execute_replies.sweep_expired();
448 self.inner.register_replies.sweep_expired();
449 let local_handler = self
451 .inner
452 .local
453 .lock()
454 .get(&command_id)
455 .map(|entry| entry.handler.clone());
456 if let Some(handler) = local_handler {
457 return handler(request)
458 .await
459 .map(Some)
460 .map_err(|e| e.into_command_error(&command_id));
461 }
462
463 let remote_target = self.inner.remote.lock().get(&command_id).cloned();
465 let target = match remote_target {
466 Some(t) => Some(t),
467 None => self.inner.router_channel.clone(),
468 };
469
470 let Some(target_id) = target else {
471 return Err(CommandError::NotFound(command_id));
472 };
473
474 let channel = self.inner.channels.lock().get(&target_id).cloned();
475 let Some(channel) = channel else {
476 return Err(CommandError::ChannelDisconnected);
477 };
478
479 self.forward_execute(command_id, request, &channel, target_id)
480 .await
481 }
482
483 async fn forward_execute(
484 &self,
485 command_id: String,
486 request: Value,
487 channel: &Arc<dyn CommandChannel>,
488 target_id: String,
489 ) -> Result<Option<Value>, CommandError> {
490 let req_id = MessageId::new_v4();
491 let (tx, rx) = oneshot::channel();
492 self.inner.execute_replies.insert(
493 req_id,
494 PendingExecute {
495 tx,
496 target_channel: target_id,
497 },
498 );
499 channel
500 .send(Message::ExecuteCommandRequest {
501 id: req_id,
502 command_id: command_id.clone(),
503 request: value_to_wire(request),
507 })
508 .map_err(|_| CommandError::ChannelDisconnected)?;
509
510 match rx.await {
511 Ok(ExecuteResult::Ok { result, .. }) => Ok(result),
512 Ok(ExecuteResult::Err { error, .. }) => Err(error_to_command_error(error, &command_id)),
513 Err(_) => {
514 self.inner.execute_replies.remove(&req_id);
515 Err(CommandError::ChannelDisconnected)
516 }
517 }
518 }
519
520 pub fn emit<E: Event>(&self, event: E) -> Result<(), CommandError> {
529 let event_id = event.id().to_string();
530 let payload_value = serde_json::to_value(&event)?;
531 let msg_id = MessageId::new_v4();
532 self.inner.seen_events.insert(msg_id, ());
533
534 self.dispatch_event_locally(&event_id, &payload_value);
535
536 if !event_id.starts_with('_') {
537 let channels: Vec<Arc<dyn CommandChannel>> =
538 self.inner.channels.lock().values().cloned().collect();
539 let wire_payload = value_to_wire(payload_value);
542 for ch in channels {
543 let _ = ch.send(Message::Event {
544 id: msg_id,
545 event_id: event_id.clone(),
546 payload: wire_payload.clone(),
547 });
548 }
549 }
550 Ok(())
551 }
552
553 pub fn on<E: Event + serde::de::DeserializeOwned>(
566 &self,
567 listener: impl Fn(E) + Send + Sync + 'static,
568 ) -> impl FnOnce() + Send + Sync + 'static {
569 self.install_listener(E::ID, move |value| {
570 if let Ok(typed) = serde_json::from_value::<E>(value) {
571 listener(typed);
572 }
573 })
574 }
575
576 pub fn on_dyn<F>(
584 &self,
585 event_id: impl Into<String>,
586 listener: F,
587 ) -> impl FnOnce() + Send + Sync + 'static
588 where
589 F: Fn(Value) + Send + Sync + 'static,
590 {
591 self.install_listener(&event_id.into(), listener)
592 }
593
594 fn install_listener<F>(
595 &self,
596 event_id: &str,
597 listener: F,
598 ) -> impl FnOnce() + Send + Sync + 'static
599 where
600 F: Fn(Value) + Send + Sync + 'static,
601 {
602 let token = self
603 .inner
604 .next_listener_token
605 .fetch_add(1, Ordering::Relaxed);
606 self.inner
607 .event_listeners
608 .lock()
609 .entry(event_id.to_string())
610 .or_default()
611 .insert(token, Arc::new(listener));
612
613 let inner = Arc::clone(&self.inner);
614 let event_id = event_id.to_string();
615 move || {
616 let mut map = inner.event_listeners.lock();
617 if let Some(slot) = map.get_mut(&event_id) {
618 slot.remove(&token);
619 if slot.is_empty() {
620 map.remove(&event_id);
621 }
622 }
623 }
624 }
625
626 fn dispatch_event_locally(&self, event_id: &str, payload: &Value) {
627 let listeners: Vec<EventListener> = self
628 .inner
629 .event_listeners
630 .lock()
631 .get(event_id)
632 .map(|m| m.values().cloned().collect())
633 .unwrap_or_default();
634 for l in listeners {
635 l(payload.clone());
636 }
637 }
638
639 pub async fn dispose(&self) {
654 let channels: Vec<Arc<dyn CommandChannel>> = {
657 let mut locked = self.inner.channels.lock();
658 let out: Vec<_> = locked.values().cloned().collect();
659 locked.clear();
660 out
661 };
662
663 for ch in channels {
669 ch.close().await;
670 }
671
672 self.inner.local.lock().clear();
673 self.inner.remote.lock().clear();
674 self.inner.remote_defs.lock().clear();
675 self.inner.event_listeners.lock().clear();
676 }
677}
678
679impl Inner {
680 fn local_command_defs(&self) -> Vec<CommandDef> {
681 self.local
682 .lock()
683 .values()
684 .filter(|e| !e.is_private)
685 .map(|e| e.def.clone())
686 .collect()
687 }
688
689 async fn handle_message(inner: Arc<Self>, channel: Arc<dyn CommandChannel>, msg: Message) {
691 match msg {
692 Message::RegisterCommandRequest { id, command } => {
693 Self::handle_register_request(inner, channel, id, command).await;
694 }
695 Message::RegisterCommandResponse { thid, response, .. } => {
696 if let Some(pending) = inner.register_replies.remove(&thid) {
697 let _ = pending.tx.send(RegisterOutcome::Wire(response));
698 }
699 }
700 Message::ListCommandsRequest { id } => {
701 let commands = inner.local_command_defs();
702 let _ = channel.send(Message::ListCommandsResponse {
703 id: MessageId::new_v4(),
704 thid: id,
705 commands,
706 });
707 }
708 Message::ListCommandsResponse { commands, .. } => {
709 let channel_id = channel.id().to_string();
710 let mut remote = inner.remote.lock();
711 let mut remote_defs = inner.remote_defs.lock();
712 for cmd in commands {
713 let cmd = CommandDef {
716 id: cmd.id,
717 description: cmd.description,
718 schema: cmd.schema.map(crate::schema::normalize_command_schema),
719 };
720 let entry_is_new = !remote.contains_key(&cmd.id);
721 if entry_is_new {
722 remote.insert(cmd.id.clone(), channel_id.clone());
723 }
724 remote_defs.insert(cmd.id.clone(), cmd);
726 }
727 }
728 Message::ExecuteCommandRequest {
729 id,
730 command_id,
731 request,
732 } => {
733 Self::handle_execute_request(
734 inner,
735 channel,
736 id,
737 command_id,
738 request.unwrap_or(Value::Null),
739 )
740 .await;
741 }
742 Message::ExecuteCommandResponse { thid, response, .. } => {
743 Self::handle_execute_response(&inner, thid, response);
744 }
745 Message::Event {
746 id,
747 event_id,
748 payload,
749 } => {
750 Self::handle_event(&inner, channel, id, event_id, payload);
751 }
752 }
753 }
754
755 async fn handle_register_request(
756 inner: Arc<Self>,
757 channel: Arc<dyn CommandChannel>,
758 req_id: MessageId,
759 command: CommandDef,
760 ) {
761 let command = CommandDef {
765 id: command.id,
766 description: command.description,
767 schema: command.schema.map(crate::schema::normalize_command_schema),
768 };
769 let channel_id = channel.id().to_string();
770 let command_id = command.id.clone();
771
772 let dup = inner.local.lock().contains_key(&command_id);
774 if dup {
775 let _ = channel.send(Message::RegisterCommandResponse {
776 id: MessageId::new_v4(),
777 thid: req_id,
778 response: RegisterResult::Err {
779 ok: False,
780 error: RegisterErrorCode::DuplicateCommand,
781 },
782 });
783 return;
784 }
785
786 let existing_owner = inner.remote.lock().get(&command_id).cloned();
794 match existing_owner {
795 Some(owner) if owner == channel_id => {
796 inner.remote_defs.lock().insert(command_id, command);
798 let _ = channel.send(Message::RegisterCommandResponse {
799 id: MessageId::new_v4(),
800 thid: req_id,
801 response: RegisterResult::Ok { ok: True },
802 });
803 return;
804 }
805 Some(_) => {
806 let _ = channel.send(Message::RegisterCommandResponse {
807 id: MessageId::new_v4(),
808 thid: req_id,
809 response: RegisterResult::Err {
810 ok: False,
811 error: RegisterErrorCode::DuplicateCommand,
812 },
813 });
814 return;
815 }
816 None => {}
817 }
818
819 if let Some(router_id) = inner.router_channel.clone() {
821 if router_id != channel_id {
822 let router_ch = inner.channels.lock().get(&router_id).cloned();
823 if let Some(router_ch) = router_ch {
824 let up_id = MessageId::new_v4();
825 let (tx, rx) = oneshot::channel();
826 inner.register_replies.insert(
827 up_id,
828 PendingRegister {
829 tx,
830 target_channel: router_id,
831 },
832 );
833 if router_ch
834 .send(Message::RegisterCommandRequest {
835 id: up_id,
836 command: command.clone(),
837 })
838 .is_ok()
839 {
840 let up = rx.await;
841 match up {
842 Ok(RegisterOutcome::Wire(RegisterResult::Ok { .. })) => {}
843 Ok(RegisterOutcome::Wire(RegisterResult::Err { error, .. })) => {
844 let _ = channel.send(Message::RegisterCommandResponse {
845 id: MessageId::new_v4(),
846 thid: req_id,
847 response: RegisterResult::Err { ok: False, error },
848 });
849 return;
850 }
851 Ok(RegisterOutcome::Timeout)
860 | Ok(RegisterOutcome::Disconnected)
861 | Err(_) => {
862 let _ = channel.send(Message::RegisterCommandResponse {
863 id: MessageId::new_v4(),
864 thid: req_id,
865 response: RegisterResult::Err {
866 ok: False,
867 error: RegisterErrorCode::DuplicateCommand,
868 },
869 });
870 return;
871 }
872 }
873 }
874 }
875 }
876 }
877
878 inner.remote.lock().insert(command_id.clone(), channel_id);
879 inner.remote_defs.lock().insert(command_id, command);
880 let _ = channel.send(Message::RegisterCommandResponse {
881 id: MessageId::new_v4(),
882 thid: req_id,
883 response: RegisterResult::Ok { ok: True },
884 });
885 }
886
887 async fn handle_execute_request(
888 inner: Arc<Self>,
889 origin: Arc<dyn CommandChannel>,
890 req_id: MessageId,
891 command_id: String,
892 request: Value,
893 ) {
894 let handler = inner
896 .local
897 .lock()
898 .get(&command_id)
899 .map(|e| e.handler.clone());
900 if let Some(handler) = handler {
901 let result = handler(request).await;
902 let response = match result {
903 Ok(v) => ExecuteResult::Ok {
904 ok: True,
905 result: value_to_wire(v),
908 },
909 Err(error) => ExecuteResult::Err { ok: False, error },
910 };
911 let _ = origin.send(Message::ExecuteCommandResponse {
912 id: MessageId::new_v4(),
913 thid: req_id,
914 response,
915 });
916 return;
917 }
918
919 let target_id = inner
921 .remote
922 .lock()
923 .get(&command_id)
924 .cloned()
925 .or_else(|| inner.router_channel.clone());
926
927 let origin_id = origin.id().to_string();
928 let Some(target_id) = target_id else {
929 let _ = origin.send(Message::ExecuteCommandResponse {
930 id: MessageId::new_v4(),
931 thid: req_id,
932 response: ExecuteResult::Err {
933 ok: False,
934 error: ExecuteError {
935 code: ExecuteErrorCode::NotFound,
936 message: format!("command not found: {command_id}"),
937 },
938 },
939 });
940 return;
941 };
942
943 if target_id == origin_id {
944 let _ = origin.send(Message::ExecuteCommandResponse {
946 id: MessageId::new_v4(),
947 thid: req_id,
948 response: ExecuteResult::Err {
949 ok: False,
950 error: ExecuteError {
951 code: ExecuteErrorCode::NotFound,
952 message: format!("command not found: {command_id}"),
953 },
954 },
955 });
956 return;
957 }
958
959 let target = inner.channels.lock().get(&target_id).cloned();
960 let Some(target) = target else {
961 let _ = origin.send(Message::ExecuteCommandResponse {
962 id: MessageId::new_v4(),
963 thid: req_id,
964 response: ExecuteResult::Err {
965 ok: False,
966 error: ExecuteError {
967 code: ExecuteErrorCode::ChannelDisconnected,
968 message: "target channel disconnected".into(),
969 },
970 },
971 });
972 return;
973 };
974
975 inner.routes.insert(
976 req_id,
977 RouteEntry {
978 origin_channel: origin_id,
979 target_channel: target_id,
980 },
981 );
982 let _ = target.send(Message::ExecuteCommandRequest {
983 id: req_id,
984 command_id,
985 request: value_to_wire(request),
986 });
987 }
988
989 fn handle_execute_response(inner: &Arc<Self>, thid: MessageId, response: ExecuteResult) {
990 if let Some(pending) = inner.execute_replies.remove(&thid) {
992 let _ = pending.tx.send(response);
993 return;
994 }
995
996 if let Some(route) = inner.routes.remove(&thid) {
998 let origin = inner.channels.lock().get(&route.origin_channel).cloned();
999 if let Some(origin) = origin {
1000 let _ = origin.send(Message::ExecuteCommandResponse {
1001 id: MessageId::new_v4(),
1002 thid,
1003 response,
1004 });
1005 }
1006 }
1007 }
1008
1009 fn handle_event(
1010 inner: &Arc<Self>,
1011 origin: Arc<dyn CommandChannel>,
1012 msg_id: MessageId,
1013 event_id: String,
1014 payload: Option<Value>,
1015 ) {
1016 if inner.seen_events.contains_key(&msg_id) {
1017 return;
1018 }
1019 inner.seen_events.insert(msg_id, ());
1020
1021 let payload_value = payload.clone().unwrap_or(Value::Null);
1022 let listeners: Vec<EventListener> = inner
1023 .event_listeners
1024 .lock()
1025 .get(&event_id)
1026 .map(|m| m.values().cloned().collect())
1027 .unwrap_or_default();
1028 for l in listeners {
1029 l(payload_value.clone());
1030 }
1031
1032 if event_id.starts_with('_') {
1033 return;
1034 }
1035
1036 let channels: Vec<Arc<dyn CommandChannel>> = inner
1037 .channels
1038 .lock()
1039 .iter()
1040 .filter(|(k, _)| k.as_str() != origin.id())
1041 .map(|(_, v)| v.clone())
1042 .collect();
1043 for ch in channels {
1044 let _ = ch.send(Message::Event {
1045 id: msg_id,
1046 event_id: event_id.clone(),
1047 payload: payload.clone(),
1048 });
1049 }
1050 }
1051
1052 fn handle_channel_close(inner: &Arc<Self>, channel_id: &str) {
1054 inner.channels.lock().remove(channel_id);
1056
1057 let dropped_ids: Vec<String> = {
1060 let mut remote = inner.remote.lock();
1061 let to_drop: Vec<String> = remote
1062 .iter()
1063 .filter(|(_, owner)| *owner == channel_id)
1064 .map(|(id, _)| id.clone())
1065 .collect();
1066 for id in &to_drop {
1067 remote.remove(id);
1068 }
1069 to_drop
1070 };
1071 let mut remote_defs = inner.remote_defs.lock();
1072 for id in dropped_ids {
1073 remote_defs.remove(&id);
1074 }
1075 drop(remote_defs);
1076
1077 let exec_ids: Vec<MessageId> = inner
1080 .execute_replies
1081 .snapshot_keys_where(|v| v.target_channel == channel_id);
1082 for id in exec_ids {
1083 if let Some(pending) = inner.execute_replies.remove(&id) {
1084 let _ = pending.tx.send(ExecuteResult::Err {
1085 ok: False,
1086 error: ExecuteError {
1087 code: ExecuteErrorCode::ChannelDisconnected,
1088 message: "channel disconnected".into(),
1089 },
1090 });
1091 }
1092 }
1093
1094 let reg_ids: Vec<MessageId> = inner
1095 .register_replies
1096 .snapshot_keys_where(|v| v.target_channel == channel_id);
1097 for id in reg_ids {
1098 if let Some(pending) = inner.register_replies.remove(&id) {
1099 let _ = pending.tx.send(RegisterOutcome::Disconnected);
1100 }
1101 }
1102
1103 let route_ids: Vec<MessageId> = inner.routes.snapshot_keys_where(|r| {
1106 r.origin_channel == channel_id || r.target_channel == channel_id
1107 });
1108 for id in route_ids {
1109 if let Some(route) = inner.routes.remove(&id) {
1110 if route.origin_channel == channel_id {
1111 continue;
1112 }
1113 let origin = inner.channels.lock().get(&route.origin_channel).cloned();
1114 if let Some(origin) = origin {
1115 let _ = origin.send(Message::ExecuteCommandResponse {
1116 id: MessageId::new_v4(),
1117 thid: id,
1118 response: ExecuteResult::Err {
1119 ok: False,
1120 error: ExecuteError {
1121 code: ExecuteErrorCode::ChannelDisconnected,
1122 message: "target channel disconnected".into(),
1123 },
1124 },
1125 });
1126 }
1127 }
1128 }
1129 }
1130}
1131
1132fn command_error_to_execute(e: &CommandError, command_id: &str) -> ExecuteError {
1135 match e {
1136 CommandError::InvalidRequest { message, .. } => ExecuteError {
1137 code: ExecuteErrorCode::InvalidRequest,
1138 message: message.clone(),
1139 },
1140 CommandError::Internal { message, .. } => ExecuteError {
1141 code: ExecuteErrorCode::InternalError,
1142 message: message.clone(),
1143 },
1144 CommandError::Timeout => ExecuteError {
1145 code: ExecuteErrorCode::Timeout,
1146 message: "request timed out".into(),
1147 },
1148 CommandError::ChannelDisconnected => ExecuteError {
1149 code: ExecuteErrorCode::ChannelDisconnected,
1150 message: "channel disconnected".into(),
1151 },
1152 CommandError::NotFound(id) => ExecuteError {
1153 code: ExecuteErrorCode::NotFound,
1154 message: format!("command not found: {id}"),
1155 },
1156 _ => ExecuteError {
1157 code: ExecuteErrorCode::InternalError,
1158 message: format!("{e} [command {command_id}]"),
1159 },
1160 }
1161}
1162
1163fn error_to_command_error(err: ExecuteError, command_id: &str) -> CommandError {
1164 match err.code {
1165 ExecuteErrorCode::NotFound => CommandError::NotFound(command_id.into()),
1166 ExecuteErrorCode::InvalidRequest => CommandError::InvalidRequest {
1167 command_id: command_id.into(),
1168 message: err.message,
1169 },
1170 ExecuteErrorCode::InternalError => CommandError::Internal {
1171 command_id: command_id.into(),
1172 message: err.message,
1173 },
1174 ExecuteErrorCode::Timeout => CommandError::Timeout,
1175 ExecuteErrorCode::ChannelDisconnected => CommandError::ChannelDisconnected,
1176 }
1177}
1178
1179impl ExecuteError {
1181 fn into_command_error(self, command_id: &str) -> CommandError {
1182 error_to_command_error(self, command_id)
1183 }
1184}
1185
1186fn value_to_wire(v: Value) -> Option<Value> {
1195 if v.is_null() {
1196 None
1197 } else {
1198 Some(v)
1199 }
1200}
1201
1202fn value_from_request<T: Serialize>(v: &T) -> Result<Value, CommandError> {
1205 serde_json::to_value(v).map_err(CommandError::Serde)
1206}